You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by el...@apache.org on 2008/11/07 02:03:31 UTC
svn commit: r712028 [1/2] - in /mina/branches/mina-new-chain2:
core/src/main/java/org/apache/mina/core/filterchain/
core/src/main/java/org/apache/mina/core/polling/
core/src/main/java/org/apache/mina/core/service/
core/src/main/java/org/apache/mina/cor...
Author: elecharny
Date: Thu Nov 6 17:03:23 2008
New Revision: 712028
URL: http://svn.apache.org/viewvc?rev=712028&view=rev
Log:
Another experiment with chain :
- removed all the chain builder
- replaced them with simple ArrayList in service
- replaced them with a CoWAL in sessions
- Removed the Entry, EntryImpl, NextFilter and HeadFilter classes/interfaces
- Extracted the TailFilter (it's now a first citizen class)
As a result, there are a hell lot of compilation errors, but at least, the chat server is now accepting connections and the stack is limited to 4 elements when traversing the chain.
There are more to do : the write chain is not created yet, and there are a lot of filters which need to be fixed.
I think it's a better way to handle chains than the way I did in my other branch... More to come !
Added:
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/TailFilter.java
Removed:
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChainBuilder.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterChain.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterChainBuilder.java
Modified:
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilter.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterAdapter.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterEvent.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/AbstractIoService.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoProcessor.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoService.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoServiceListenerSupport.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/DummySession.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IdleStatusChecker.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoEvent.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoSession.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoderOutput.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/util/CommonEventFilter.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/transport/socket/nio/NioSession.java
mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
mina/branches/mina-new-chain2/example/src/main/java/org/apache/mina/example/chat/Main.java
mina/branches/mina-new-chain2/example/src/main/java/org/apache/mina/example/chat/client/ChatClientSupport.java
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilter.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilter.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilter.java Thu Nov 6 17:03:23 2008
@@ -49,13 +49,13 @@
* <ol>
* <li>{@link #init()} is invoked by {@link ReferenceCountingFilter} if
* the filter is added at the first time.</li>
- * <li>{@link #onPreAdd(IoFilterChain, String, NextFilter)} is invoked to notify
+ * <li>{@link #onPreAdd(IoFilterChain, String, IoFilter)} is invoked to notify
* that the filter will be added to the chain.</li>
* <li>The filter is added to the chain, and all events and I/O requests
* pass through the filter from now.</li>
- * <li>{@link #onPostAdd(IoFilterChain, String, NextFilter)} is invoked to notify
+ * <li>{@link #onPostAdd(IoFilterChain, String, IoFilter)} is invoked to notify
* that the filter is added to the chain.</li>
- * <li>The filter is removed from the chain if {@link #onPostAdd(IoFilterChain, String, org.apache.mina.core.filterchain.IoFilter.NextFilter)}
+ * <li>The filter is removed from the chain if {@link #onPostAdd(IoFilterChain, String, IoFilter)}
* threw an exception. {@link #destroy()} is also invoked by
* {@link ReferenceCountingFilter} if the filter is the last filter which
* was added to {@link IoFilterChain}s.</li>
@@ -63,11 +63,11 @@
* <p>
* When you remove an {@link IoFilter} from an {@link IoFilterChain}:
* <ol>
- * <li>{@link #onPreRemove(IoFilterChain, String, NextFilter)} is invoked to
+ * <li>{@link #onPreRemove(IoFilterChain, String, IoFilter)} is invoked to
* notify that the filter will be removed from the chain.</li>
* <li>The filter is removed from the chain, and any events and I/O requests
* don't pass through the filter from now.</li>
- * <li>{@link #onPostRemove(IoFilterChain, String, NextFilter)} is invoked to
+ * <li>{@link #onPostRemove(IoFilterChain, String, IoFilter)} is invoked to
* notify that the filter is removed from the chain.</li>
* <li>{@link #destroy()} is invoked by {@link ReferenceCountingFilter} if
* the removed filter was the last one.</li>
@@ -103,10 +103,10 @@
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
- * @param nextFilter the {@link NextFilter} for this filter. You can reuse
+ * @param filter the next {@link IoFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
*/
- void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter)
+ void onPreAdd(IoSession session, int index, String name, IoFilter filter)
throws Exception;
/**
@@ -117,10 +117,10 @@
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
- * @param nextFilter the {@link NextFilter} for this filter. You can reuse
+ * @param filter the next {@link IoFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
*/
- void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter)
+ void onPostAdd(IoSession session, int index, String name, IoFilter filter)
throws Exception;
/**
@@ -131,10 +131,10 @@
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
- * @param nextFilter the {@link NextFilter} for this filter. You can reuse
+ * @param filter the {@link IoFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
*/
- void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter)
+ void onPreRemove(IoSession session, int index, String name, IoFilter filter)
throws Exception;
/**
@@ -145,127 +145,77 @@
*
* @param parent the parent who called this method
* @param name the name assigned to this filter
- * @param nextFilter the {@link NextFilter} for this filter. You can reuse
+ * @param filter the {@link IoFilter} for this filter. You can reuse
* this object until this filter is removed from the chain.
*/
- void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter)
+ void onPostRemove(IoSession session, int index, String name, IoFilter filter)
throws Exception;
/**
* Filters {@link IoHandler#sessionCreated(IoSession)} event.
*/
- void sessionCreated(NextFilter nextFilter, IoSession session)
+ void sessionCreated(int index, IoSession session)
throws Exception;
/**
* Filters {@link IoHandler#sessionOpened(IoSession)} event.
*/
- void sessionOpened(NextFilter nextFilter, IoSession session)
+ void sessionOpened(int index, IoSession session)
throws Exception;
/**
* Filters {@link IoHandler#sessionClosed(IoSession)} event.
*/
- void sessionClosed(NextFilter nextFilter, IoSession session)
+ void sessionClosed(int index, IoSession session)
throws Exception;
/**
* Filters {@link IoHandler#sessionIdle(IoSession,IdleStatus)}
* event.
*/
- void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status)
+ void sessionIdle(int index, IoSession session, IdleStatus status)
throws Exception;
/**
* Filters {@link IoHandler#exceptionCaught(IoSession,Throwable)}
* event.
*/
- void exceptionCaught(NextFilter nextFilter, IoSession session,
+ void exceptionCaught(int index, IoSession session,
Throwable cause) throws Exception;
/**
* Filters {@link IoHandler#messageReceived(IoSession,Object)}
* event.
*/
- void messageReceived(NextFilter nextFilter, IoSession session,
+ void messageReceived(int index, IoSession session,
Object message) throws Exception;
/**
* Filters {@link IoHandler#messageSent(IoSession,Object)}
* event.
*/
- void messageSent(NextFilter nextFilter, IoSession session,
+ void messageSent(int index, IoSession session,
WriteRequest writeRequest) throws Exception;
/**
* Filters {@link IoSession#close()} method invocation.
*/
- void filterClose(NextFilter nextFilter, IoSession session) throws Exception;
+ void filterClose(int index, IoSession session) throws Exception;
/**
* Filters {@link IoSession#write(Object)} method invocation.
*/
- void filterWrite(NextFilter nextFilter, IoSession session,
+ void filterWrite(int index, IoSession session,
WriteRequest writeRequest) throws Exception;
/**
* Filters {@link IoSession#setTrafficMask(TrafficMask)} method invocation.
*/
void filterSetTrafficMask(
- NextFilter nextFilter, IoSession session, TrafficMask trafficMask) throws Exception;
-
+ int index, IoSession session, TrafficMask trafficMask) throws Exception;
+
/**
- * Represents the next {@link IoFilter} in {@link IoFilterChain}.
+ * @return The filter's name
*/
- public interface NextFilter {
- /**
- * Forwards <tt>sessionCreated</tt> event to next filter.
- */
- void sessionCreated(IoSession session);
-
- /**
- * Forwards <tt>sessionOpened</tt> event to next filter.
- */
- void sessionOpened(IoSession session);
-
- /**
- * Forwards <tt>sessionClosed</tt> event to next filter.
- */
- void sessionClosed(IoSession session);
-
- /**
- * Forwards <tt>sessionIdle</tt> event to next filter.
- */
- void sessionIdle(IoSession session, IdleStatus status);
-
- /**
- * Forwards <tt>exceptionCaught</tt> event to next filter.
- */
- void exceptionCaught(IoSession session, Throwable cause);
-
- /**
- * Forwards <tt>messageReceived</tt> event to next filter.
- */
- void messageReceived(IoSession session, Object message);
-
- /**
- * Forwards <tt>messageSent</tt> event to next filter.
- */
- void messageSent(IoSession session, WriteRequest writeRequest);
-
- /**
- * Forwards <tt>filterWrite</tt> event to next filter.
- */
- void filterWrite(IoSession session, WriteRequest writeRequest);
-
- /**
- * Forwards <tt>filterClose</tt> event to next filter.
- */
- void filterClose(IoSession session);
-
- /**
- * Forwards <tt>filterSetTrafficMask</tt> event to next filter.
- */
- void filterSetTrafficMask(IoSession session, TrafficMask trafficMask);
- }
+ String getName();
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterAdapter.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterAdapter.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterAdapter.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterAdapter.java Thu Nov 6 17:03:23 2008
@@ -33,6 +33,9 @@
* @version $Rev: 591770 $, $Date: 2007-11-04 13:22:44 +0100 (Sun, 04 Nov 2007) $
*/
public class IoFilterAdapter implements IoFilter {
+ /** The filter's name */
+ private String name;
+
/**
* {@inheritDoc}
*/
@@ -44,113 +47,120 @@
*/
public void destroy() throws Exception {
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getName() {
+ return name;
+ }
/**
* {@inheritDoc}
*/
- public void onPreAdd(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
+ public void onPreAdd(IoSession session, int pos, String name,
+ IoFilter filter) throws Exception {
}
/**
* {@inheritDoc}
*/
- public void onPostAdd(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
+ public void onPostAdd(IoSession session, int pos, String name,
+ IoFilter filter) throws Exception {
}
/**
* {@inheritDoc}
*/
- public void onPreRemove(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
+ public void onPreRemove(IoSession session, int pos, String name,
+ IoFilter filter) throws Exception {
}
/**
* {@inheritDoc}
*/
- public void onPostRemove(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
+ public void onPostRemove(IoSession session, int pos, String name,
+ IoFilter filter) throws Exception {
}
/**
* {@inheritDoc}
*/
- public void sessionCreated(NextFilter nextFilter, IoSession session)
+ public void sessionCreated(int index, IoSession session)
throws Exception {
- nextFilter.sessionCreated(session);
+ session.getFilter(index).sessionCreated(index+1, session);
}
/**
* {@inheritDoc}
*/
- public void sessionOpened(NextFilter nextFilter, IoSession session)
+ public void sessionOpened(int index, IoSession session)
throws Exception {
- nextFilter.sessionOpened(session);
+ session.getFilter(index).sessionOpened(index+1, session);
}
/**
* {@inheritDoc}
*/
- public void sessionClosed(NextFilter nextFilter, IoSession session)
+ public void sessionClosed(int index, IoSession session)
throws Exception {
- nextFilter.sessionClosed(session);
+ session.getFilter(index).sessionClosed(index+1, session);
}
/**
* {@inheritDoc}
*/
- public void sessionIdle(NextFilter nextFilter, IoSession session,
+ public void sessionIdle(int index, IoSession session,
IdleStatus status) throws Exception {
- nextFilter.sessionIdle(session, status);
+ session.getFilter(index).sessionIdle(index+1, session, status);
}
/**
* {@inheritDoc}
*/
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ public void exceptionCaught(int index, IoSession session,
Throwable cause) throws Exception {
- nextFilter.exceptionCaught(session, cause);
+ session.getFilter(index).exceptionCaught(index+1, session, cause);
}
/**
* {@inheritDoc}
*/
- public void messageReceived(NextFilter nextFilter, IoSession session,
+ public void messageReceived(int index, IoSession session,
Object message) throws Exception {
- nextFilter.messageReceived(session, message);
+ session.getFilter(index).messageReceived(index+1, session, message);
}
/**
* {@inheritDoc}
*/
- public void messageSent(NextFilter nextFilter, IoSession session,
+ public void messageSent(int index, IoSession session,
WriteRequest writeRequest) throws Exception {
- nextFilter.messageSent(session, writeRequest);
+ session.getFilter(index).messageSent(index+1, session, writeRequest);
}
/**
* {@inheritDoc}
*/
- public void filterWrite(NextFilter nextFilter, IoSession session,
+ public void filterWrite(int index, IoSession session,
WriteRequest writeRequest) throws Exception {
- nextFilter.filterWrite(session, writeRequest);
+ session.getFilter(index).filterWrite(index+1, session, writeRequest);
}
/**
* {@inheritDoc}
*/
- public void filterClose(NextFilter nextFilter, IoSession session)
+ public void filterClose(int index, IoSession session)
throws Exception {
- nextFilter.filterClose(session);
+ session.getFilter(index).filterClose(index+1, session);
}
/**
* {@inheritDoc}
*/
- public void filterSetTrafficMask(NextFilter nextFilter, IoSession session,
+ public void filterSetTrafficMask(int index, IoSession session,
TrafficMask trafficMask) throws Exception {
- nextFilter.filterSetTrafficMask(session, trafficMask);
+ session.getFilter(index).filterSetTrafficMask(index+1, session, trafficMask);
}
public String toString() {
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterEvent.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterEvent.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterEvent.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/IoFilterEvent.java Thu Nov 6 17:03:23 2008
@@ -19,7 +19,6 @@
*/
package org.apache.mina.core.filterchain;
-import org.apache.mina.core.filterchain.IoFilter.NextFilter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoEvent;
import org.apache.mina.core.session.IoEventType;
@@ -36,55 +35,50 @@
* @version $Rev: 591770 $, $Date: 2007-11-04 13:22:44 +0100 (Sun, 04 Nov 2007) $
*/
public class IoFilterEvent extends IoEvent {
+ int nextFilterIndex;
- private final NextFilter nextFilter;
-
- public IoFilterEvent(NextFilter nextFilter, IoEventType type,
+ public IoFilterEvent(int nextFilterIndex, IoEventType type,
IoSession session, Object parameter) {
super(type, session, parameter);
- if (nextFilter == null) {
- throw new NullPointerException("nextFilter");
- }
- this.nextFilter = nextFilter;
+ this.nextFilterIndex = nextFilterIndex;
}
- public NextFilter getNextFilter() {
- return nextFilter;
+ public IoFilter getNextFilter() {
+ return getSession().getFilter(nextFilterIndex);
}
- @Override
- public void fire() {
+ public void fire() throws Exception {
switch (getType()) {
case MESSAGE_RECEIVED:
- getNextFilter().messageReceived(getSession(), getParameter());
+ getNextFilter().messageReceived(nextFilterIndex+1, getSession(), getParameter());
break;
case MESSAGE_SENT:
- getNextFilter().messageSent(getSession(), (WriteRequest) getParameter());
+ getNextFilter().messageSent(nextFilterIndex+1, getSession(), (WriteRequest) getParameter());
break;
case WRITE:
- getNextFilter().filterWrite(getSession(), (WriteRequest) getParameter());
+ getNextFilter().filterWrite(nextFilterIndex+1, getSession(), (WriteRequest) getParameter());
break;
case SET_TRAFFIC_MASK:
- getNextFilter().filterSetTrafficMask(getSession(), (TrafficMask) getParameter());
+ getNextFilter().filterSetTrafficMask(nextFilterIndex+1, getSession(), (TrafficMask) getParameter());
break;
case CLOSE:
- getNextFilter().filterClose(getSession());
+ getNextFilter().filterClose(nextFilterIndex+1, getSession());
break;
case EXCEPTION_CAUGHT:
- getNextFilter().exceptionCaught(getSession(), (Throwable) getParameter());
+ getNextFilter().exceptionCaught(nextFilterIndex+1, getSession(), (Throwable) getParameter());
break;
case SESSION_IDLE:
- getNextFilter().sessionIdle(getSession(), (IdleStatus) getParameter());
+ getNextFilter().sessionIdle(nextFilterIndex+1, getSession(), (IdleStatus) getParameter());
break;
case SESSION_OPENED:
- getNextFilter().sessionOpened(getSession());
+ getNextFilter().sessionOpened(nextFilterIndex+1, getSession());
break;
case SESSION_CREATED:
- getNextFilter().sessionCreated(getSession());
+ getNextFilter().sessionCreated(nextFilterIndex+1, getSession());
break;
case SESSION_CLOSED:
- getNextFilter().sessionClosed(getSession());
+ getNextFilter().sessionClosed(nextFilterIndex+1, getSession());
break;
default:
throw new IllegalArgumentException("Unknown event type: " + getType());
Added: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/TailFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/TailFilter.java?rev=712028&view=auto
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/TailFilter.java (added)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/filterchain/TailFilter.java Thu Nov 6 17:03:23 2008
@@ -0,0 +1,114 @@
+package org.apache.mina.core.filterchain;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.session.AbstractIoSession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+
+public class TailFilter extends IoFilterAdapter {
+ @Override
+ public void sessionCreated(int index, IoSession session)
+ throws Exception {
+ try {
+ session.getHandler().sessionCreated(session);
+ } finally {
+ // Notify the related future.
+ ConnectFuture future = (ConnectFuture) session
+ .removeAttribute(AbstractIoSession.SESSION_CREATED_FUTURE);
+ if (future != null) {
+ future.setSession(session);
+ }
+ }
+ }
+
+ @Override
+ public void sessionOpened(int index, IoSession session)
+ throws Exception {
+ session.getHandler().sessionOpened(session);
+ }
+
+ @Override
+ public void sessionClosed(int index, IoSession session)
+ throws Exception {
+ AbstractIoSession s = (AbstractIoSession) session;
+ try {
+ s.getHandler().sessionClosed(session);
+ } finally {
+ try {
+ s.getWriteRequestQueue().dispose(session);
+ } finally {
+ try {
+ s.getAttributeMap().dispose(session);
+ } finally {
+ try {
+ // Remove all filters.
+ session.getFilterChain().clear();
+ } finally {
+ if (s.getConfig().isUseReadOperation()) {
+ s.offerClosedReadFuture();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void sessionIdle(int index, IoSession session,
+ IdleStatus status) throws Exception {
+ session.getHandler().sessionIdle(session, status);
+ }
+
+ @Override
+ public void exceptionCaught(int index, IoSession session,
+ Throwable cause) throws Exception {
+ AbstractIoSession s = (AbstractIoSession) session;
+ try {
+ s.getHandler().exceptionCaught(s, cause);
+ } finally {
+ if (s.getConfig().isUseReadOperation()) {
+ s.offerFailedReadFuture(cause);
+ }
+ }
+ }
+
+ @Override
+ public void messageReceived(int index, IoSession session,
+ Object message) throws Exception {
+ AbstractIoSession s = (AbstractIoSession) session;
+ if (!(message instanceof IoBuffer)) {
+ s.increaseReadMessages(System.currentTimeMillis());
+ } else if (!((IoBuffer) message).hasRemaining()) {
+ s.increaseReadMessages(System.currentTimeMillis());
+ }
+
+ try {
+ session.getHandler().messageReceived(s, message);
+ } finally {
+ if (s.getConfig().isUseReadOperation()) {
+ s.offerReadFuture(message);
+ }
+ }
+ }
+
+ @Override
+ public void messageSent(int index, IoSession session,
+ WriteRequest writeRequest) throws Exception {
+ session.getHandler()
+ .messageSent(session, writeRequest.getMessage());
+ }
+
+ @Override
+ public void filterWrite(int index, IoSession session,
+ WriteRequest writeRequest) throws Exception {
+ session.getFilter(index).filterWrite(0, session, writeRequest);
+ }
+
+ @Override
+ public void filterClose(int index, IoSession session)
+ throws Exception {
+ session.getFilter(index).filterClose(0, session);
+ }
+}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingConnectionlessIoAcceptor.java Thu Nov 6 17:03:23 2008
@@ -239,7 +239,8 @@
finishSessionInitialization(session, null, null);
try {
- this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+ session.getService().setFilterChainBuilder(
+ session.getFilterChain());
getListeners().fireSessionCreated(session);
} catch (Throwable t) {
ExceptionMonitor.getInstance().exceptionCaught(t);
@@ -277,7 +278,7 @@
}
}
- public void remove(T session) {
+ public void remove(T session) throws Exception {
getSessionRecycler().remove(session);
getListeners().fireSessionDestroyed(session);
}
@@ -415,11 +416,11 @@
newBuf.put(readBuf);
newBuf.flip();
- session.getFilterChain().fireMessageReceived(newBuf);
+ session.getFilter(0).messageReceived(0, session, newBuf);
}
}
- private void flushSessions(long currentTime) {
+ private void flushSessions(long currentTime) throws Exception {
for (; ;) {
T session = flushingSessions.poll();
if (session == null) {
@@ -435,7 +436,7 @@
scheduleFlush(session);
}
} catch (Exception e) {
- session.getFilterChain().fireExceptionCaught(e);
+ session.getFilter(0).exceptionCaught(0, session, e);
}
}
}
@@ -466,7 +467,7 @@
// Clear and fire event
session.setCurrentWriteRequest(null);
buf.reset();
- session.getFilterChain().fireMessageSent(req);
+ session.getFilter(0).messageSent(0, session, req);
continue;
}
@@ -487,7 +488,7 @@
session.setCurrentWriteRequest(null);
writtenBytes += localWrittenBytes;
buf.reset();
- session.getFilterChain().fireMessageSent(req);
+ session.getFilter(0).messageSent(0, session, req);
}
}
} finally {
@@ -567,7 +568,7 @@
return nHandles;
}
- private void notifyIdleSessions(long currentTime) {
+ private void notifyIdleSessions(long currentTime) throws Exception {
// process idle sessions
if (currentTime - lastIdleCheckTime >= 1000) {
lastIdleCheckTime = currentTime;
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Thu Nov 6 17:03:23 2008
@@ -33,7 +33,7 @@
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
-import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.future.DefaultIoFuture;
import org.apache.mina.core.service.AbstractIoService;
import org.apache.mina.core.service.IoProcessor;
@@ -384,7 +384,7 @@
wakeup();
}
- private int add() {
+ private int add() throws Exception {
int addedSessions = 0;
// Loop on the new sessions blocking queue, to count
@@ -407,16 +407,16 @@
return addedSessions;
}
- private boolean addNow(T session) {
-
+ private boolean addNow(T session) throws Exception {
boolean registered = false;
boolean notified = false;
+
try {
init(session);
registered = true;
// Build the filter chain of this session.
- session.getService().getFilterChainBuilder().buildFilterChain(
+ session.getService().setFilterChainBuilder(
session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
@@ -428,8 +428,8 @@
// Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
// and call ConnectFuture.setException().
scheduleRemove(session);
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
wakeup();
} else {
ExceptionMonitor.getInstance().exceptionCaught(e);
@@ -445,7 +445,7 @@
return registered;
}
- private int remove() {
+ private int remove() throws Exception {
int removedSessions = 0;
for (; ;) {
T session = removingSessions.poll();
@@ -477,15 +477,15 @@
return removedSessions;
}
- private boolean removeNow(T session) {
+ private boolean removeNow(T session) throws Exception {
clearWriteRequestQueue(session);
try {
destroy(session);
return true;
} catch (Exception e) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
} finally {
clearWriteRequestQueue(session);
((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
@@ -493,7 +493,7 @@
return false;
}
- private void clearWriteRequestQueue(T session) {
+ private void clearWriteRequestQueue(T session) throws Exception {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
@@ -510,8 +510,8 @@
buf.reset();
failedRequests.add(req);
} else {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageSent(req);
+ IoFilter filter = session.getFilter(0);
+ filter.messageSent(0, session, req);
}
} else {
failedRequests.add(req);
@@ -530,8 +530,9 @@
session.decreaseScheduledBytesAndMessages(r);
r.getFuture().setException(cause);
}
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(cause);
+
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, cause);
}
}
@@ -543,7 +544,7 @@
}
}
- private void process(T session) {
+ private void process(T session) throws Exception {
if (isReadable(session) && session.getTrafficMask().isReadable()) {
read(session);
@@ -554,7 +555,7 @@
}
}
- private void read(T session) {
+ private void read(T session) throws Exception {
IoSessionConfig config = session.getConfig();
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
@@ -584,8 +585,8 @@
}
if (readBytes > 0) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageReceived(buf);
+ IoFilter filter = session.getFilter(0);
+ filter.messageReceived(0, session, buf);
buf = null;
if (hasFragmentation) {
@@ -603,8 +604,9 @@
if (e instanceof IOException) {
scheduleRemove(session);
}
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
}
}
@@ -616,7 +618,7 @@
}
}
- private void flush(long currentTime) {
+ private void flush(long currentTime) throws Exception {
final T firstSession = flushingSessions.peek();
if (firstSession == null) {
return;
@@ -636,8 +638,8 @@
}
} catch (Exception e) {
scheduleRemove(session);
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
}
break;
case CLOSED:
@@ -660,7 +662,7 @@
}
}
- private boolean flushNow(T session, long currentTime) {
+ private boolean flushNow(T session, long currentTime) throws Exception {
if (!session.isConnected()) {
scheduleRemove(session);
return false;
@@ -737,8 +739,8 @@
}
} while (writtenBytes < maxWrittenBytes);
} catch (Exception e) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
return false;
}
@@ -802,13 +804,13 @@
return localWrittenBytes;
}
- private void fireMessageSent(T session, WriteRequest req) {
+ private void fireMessageSent(T session, WriteRequest req) throws Exception {
session.setCurrentWriteRequest(null);
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireMessageSent(req);
+ IoFilter filter = session.getFilter(0);
+ filter.messageSent(0, session, req);
}
- private void updateTrafficMask() {
+ private void updateTrafficMask() throws Exception {
for (; ;) {
T session = trafficControllingSessions.poll();
@@ -835,15 +837,15 @@
}
}
- private void updateTrafficMaskNow(T session) {
+ private void updateTrafficMaskNow(T session) throws Exception {
// The normal is OP_READ and, if there are write requests in the
// session's write queue, set OP_WRITE to trigger flushing.
int mask = session.getTrafficMask().getInterestOps();
try {
setInterestedInRead(session, (mask & SelectionKey.OP_READ) != 0);
} catch (Exception e) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
}
try {
setInterestedInWrite(
@@ -851,8 +853,8 @@
!session.getWriteRequestQueue().isEmpty(session) &&
(mask & SelectionKey.OP_WRITE) != 0);
} catch (Exception e) {
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireExceptionCaught(e);
+ IoFilter filter = session.getFilter(0);
+ filter.exceptionCaught(0, session, e);
}
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/AbstractIoService.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/AbstractIoService.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/AbstractIoService.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/AbstractIoService.java Thu Nov 6 17:03:23 2008
@@ -20,6 +20,7 @@
package org.apache.mina.core.service;
import java.util.AbstractSet;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,9 +32,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.IoUtil;
-import org.apache.mina.core.filterchain.DefaultIoFilterChain;
-import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
-import org.apache.mina.core.filterchain.IoFilterChainBuilder;
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.filterchain.TailFilter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.DefaultIoFuture;
import org.apache.mina.core.future.IoFuture;
@@ -126,7 +126,12 @@
/**
* Current filter chain builder.
*/
- private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
+ private List<IoFilter> filterChain = new ArrayList<IoFilter>();
+
+ /**
+ * The terminal filter instance
+ */
+ private static TailFilter tailFilter = new TailFilter();
private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
@@ -215,30 +220,25 @@
/**
* {@inheritDoc}
*/
- public final IoFilterChainBuilder getFilterChainBuilder() {
- return filterChainBuilder;
+ public final List<IoFilter> getFilterChain() {
+ return filterChain;
}
- /**
- * {@inheritDoc}
- */
- public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
- if (builder == null) {
- builder = new DefaultIoFilterChainBuilder();
- }
- filterChainBuilder = builder;
+ public final IoFilter getTailFilter() {
+ return tailFilter;
}
-
+
/**
* {@inheritDoc}
*/
- public final DefaultIoFilterChainBuilder getFilterChain() {
- if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
- return (DefaultIoFilterChainBuilder) filterChainBuilder;
- } else {
- throw new IllegalStateException(
- "Current filter chain builder is not a DefaultIoFilterChainBuilder.");
- }
+ public final void setFilterChainBuilder(List<IoFilter> filters) {
+ if (filters == null) {
+ return;
+ }
+
+ for (IoFilter filter:filters) {
+ filterChain.add(filter);
+ }
}
/**
@@ -497,7 +497,7 @@
if (future != null && future instanceof ConnectFuture) {
// DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
- session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE,
+ session.setAttribute(AbstractIoSession.SESSION_CREATED_FUTURE,
future);
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoProcessor.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoProcessor.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoProcessor.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoProcessor.java Thu Nov 6 17:03:23 2008
@@ -66,7 +66,7 @@
* Flushes the internal write request queue of the specified
* {@code session}.
*/
- void flush(T session);
+ void flush(T session) throws Exception ;
/**
* Controls the traffic of the specified {@code session} as specified
@@ -80,5 +80,5 @@
* associated with the {@code session} and releases any other related
* resources.
*/
- void remove(T session);
+ void remove(T session) throws Exception;
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoService.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoService.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoService.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoService.java Thu Nov 6 17:03:23 2008
@@ -20,13 +20,12 @@
package org.apache.mina.core.service;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.mina.core.IoUtil;
-import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
-import org.apache.mina.core.filterchain.IoFilterChain;
-import org.apache.mina.core.filterchain.IoFilterChainBuilder;
+import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.session.IoSessionConfig;
@@ -114,7 +113,9 @@
* by this service.
* The default value is an empty {@link DefaultIoFilterChainBuilder}.
*/
- IoFilterChainBuilder getFilterChainBuilder();
+ List<IoFilter> getFilterChain();
+
+ IoFilter getTailFilter();
/**
* Sets the {@link IoFilterChainBuilder} which will build the
@@ -123,19 +124,7 @@
* If you specify <tt>null</tt> this property will be set to
* an empty {@link DefaultIoFilterChainBuilder}.
*/
- void setFilterChainBuilder(IoFilterChainBuilder builder);
-
- /**
- * A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
- * Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
- * but a {@link DefaultIoFilterChainBuilder}. Modifying the returned builder
- * won't affect the existing {@link IoSession}s at all, because
- * {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s.
- *
- * @throws IllegalStateException if the current {@link IoFilterChainBuilder} is
- * not a {@link DefaultIoFilterChainBuilder}
- */
- DefaultIoFilterChainBuilder getFilterChain();
+ void setFilterChainBuilder(List<IoFilter> builder);
/**
* Returns a value of whether or not this service is active
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoServiceListenerSupport.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoServiceListenerSupport.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoServiceListenerSupport.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/IoServiceListenerSupport.java Thu Nov 6 17:03:23 2008
@@ -32,7 +32,6 @@
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.util.ExceptionMonitor;
-import org.apache.mina.core.filterchain.IoFilterChain;
/**
* A helper which provides addition and removal of {@link IoServiceListener}s and firing
@@ -179,7 +178,7 @@
/**
* Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
*/
- public void fireSessionCreated(IoSession session) {
+ public void fireSessionCreated(IoSession session) throws Exception {
boolean firstSession = false;
if (session.getService() instanceof IoConnector) {
synchronized (managedSessions) {
@@ -198,9 +197,8 @@
}
// Fire session events.
- IoFilterChain filterChain = session.getFilterChain();
- filterChain.fireSessionCreated();
- filterChain.fireSessionOpened();
+ session.getFilter(0).sessionCreated(0, session);
+ session.getFilter(0).sessionOpened(0, session);
int managedSessionCount = managedSessions.size();
if (managedSessionCount > largestManagedSessionCount) {
@@ -221,14 +219,14 @@
/**
* Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
*/
- public void fireSessionDestroyed(IoSession session) {
+ public void fireSessionDestroyed(IoSession session) throws Exception {
// Try to remove the remaining empty session set after removal.
if (managedSessions.remove(Long.valueOf(session.getId())) == null) {
return;
}
// Fire session events.
- session.getFilterChain().fireSessionClosed();
+ session.getFilter(0).sessionClosed(0, session);
// Fire listener events.
try {
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/service/SimpleIoProcessorPool.java Thu Nov 6 17:03:23 2008
@@ -181,11 +181,11 @@
getProcessor(session).add(session);
}
- public final void flush(T session) {
+ public final void flush(T session) throws Exception {
getProcessor(session).flush(session);
}
- public final void remove(T session) {
+ public final void remove(T session) throws Exception {
getProcessor(session).remove(session);
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/AbstractIoSession.java Thu Nov 6 17:03:23 2008
@@ -31,8 +31,9 @@
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.DefaultFileRegion;
-import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.future.CloseFuture;
+import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.DefaultCloseFuture;
import org.apache.mina.core.future.DefaultReadFuture;
import org.apache.mina.core.future.DefaultWriteFuture;
@@ -51,6 +52,8 @@
import org.apache.mina.core.write.WriteToClosedSessionException;
import org.apache.mina.util.CircularQueue;
import org.apache.mina.util.ExceptionMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -60,6 +63,8 @@
* @version $Rev$, $Date$
*/
public abstract class AbstractIoSession implements IoSession {
+ /** The logger for this class */
+ private final Logger logger = LoggerFactory.getLogger(getClass());
private static final AttributeKey READY_READ_FUTURES_KEY =
new AttributeKey(AbstractIoSession.class, "readyReadFutures");
@@ -67,6 +72,15 @@
private static final AttributeKey WAITING_READ_FUTURES_KEY =
new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
+ /**
+ * A session attribute that stores an {@link IoFuture} related with
+ * the {@link IoSession}. {@link DefaultIoFilterChain} clears this
+ * attribute and notifies the future when {@link #fireSessionCreated()}
+ * or {@link #fireExceptionCaught(Throwable)} is invoked.
+ */
+ public static final AttributeKey SESSION_CREATED_FUTURE = new AttributeKey(
+ AbstractIoSession.class, "connectFuture");
+
private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
new IoFutureListener<CloseFuture>() {
public void operationComplete(CloseFuture future) {
@@ -212,7 +226,7 @@
/**
* {@inheritDoc}
*/
- public final CloseFuture close(boolean rightNow) {
+ public final CloseFuture close(boolean rightNow) throws Exception {
if (rightNow) {
return close();
} else {
@@ -220,6 +234,25 @@
}
}
+ private void exceptionCaught(Throwable cause) {
+ // Notify the related future.
+ ConnectFuture future = (ConnectFuture)removeAttribute(SESSION_CREATED_FUTURE);
+
+ if (future == null) {
+ try {
+ getFilter(0).exceptionCaught(0, this, cause);
+ } catch (Throwable t) {
+ logger.warn(
+ "Unexpected exception from exceptionCaught handler.", t);
+ }
+ } else {
+ // Please note that this place is not the only place that
+ // calls ConnectFuture.setException().
+ close();
+ future.setException(cause);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -232,14 +265,19 @@
}
}
- getFilterChain().fireFilterClose();
+ try {
+ getFilter(0).filterClose(0, this);
+ } catch (Throwable e) {
+ exceptionCaught(e);
+ }
+
return closeFuture;
}
/**
* {@inheritDoc}
*/
- public final CloseFuture closeOnFlush() {
+ public final CloseFuture closeOnFlush() throws Exception {
getWriteRequestQueue().offer(this, CLOSE_REQUEST);
getProcessor().flush(this);
return closeFuture;
@@ -412,8 +450,14 @@
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
// Then, get the chain and inject the WriteRequest into it
- IoFilterChain filterChain = getFilterChain();
- filterChain.fireFilterWrite(writeRequest);
+ IoFilter filter = getFilter(0);
+
+ try {
+ filter.filterWrite(getFilterChain().size(), this, writeRequest);
+ } catch (Throwable t) {
+ writeRequest.getFuture().setException(t);
+ exceptionCaught(t);
+ }
// TODO : This is not our business ! The caller has created a FileChannel,
// he has to close it !
@@ -567,7 +611,11 @@
return;
}
- getFilterChain().fireFilterSetTrafficMask(trafficMask);
+ try {
+ getFilter(0).filterSetTrafficMask(0, this, trafficMask);
+ } catch (Throwable t) {
+ exceptionCaught(t);
+ }
}
/**
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/DummySession.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/DummySession.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/DummySession.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/DummySession.java Thu Nov 6 17:03:23 2008
@@ -21,14 +21,13 @@
import java.io.IOException;
import java.net.SocketAddress;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.mina.core.file.FileRegion;
-import org.apache.mina.core.filterchain.DefaultIoFilterChain;
import org.apache.mina.core.filterchain.IoFilter;
-import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.service.AbstractIoAcceptor;
import org.apache.mina.core.service.DefaultTransportMetadata;
@@ -77,7 +76,7 @@
}
};
- private final IoFilterChain filterChain = new DefaultIoFilterChain(this);
+ private final List<IoFilter> filterChain = new ArrayList<IoFilter>();
private final IoProcessor<IoSession> processor;
private volatile IoHandler handler = new IoHandlerAdapter();
@@ -132,7 +131,7 @@
public void add(IoSession session) {
}
- public void flush(IoSession session) {
+ public void flush(IoSession session) throws Exception {
DummySession s = (DummySession) session;
WriteRequest req = s.getWriteRequestQueue().poll(session);
Object m = req.getMessage();
@@ -142,10 +141,10 @@
file.getFileChannel().position(file.getPosition() + file.getRemainingBytes());
file.update(file.getRemainingBytes());
} catch (IOException e) {
- s.getFilterChain().fireExceptionCaught(e);
+ s.getFilter(0).exceptionCaught(0, session, e);
}
}
- getFilterChain().fireMessageSent(req);
+ getFilter(0).messageSent(0, session, req);
}
public void remove(IoSession session) {
@@ -190,10 +189,14 @@
this.config = config;
}
- public IoFilterChain getFilterChain() {
+ public List<IoFilter> getFilterChain() {
return filterChain;
}
+ public IoFilter getFilter(int index) {
+ return filterChain.get(index);
+ }
+
public IoHandler getHandler() {
return handler;
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IdleStatusChecker.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IdleStatusChecker.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IdleStatusChecker.java Thu Nov 6 17:03:23 2008
@@ -90,6 +90,8 @@
// will exit the loop if interrupted from interrupt()
}
}
+ } catch (Exception e) {
+ thread = null;
} finally {
thread = null;
}
@@ -103,7 +105,7 @@
}
}
- private void notifyServices(long currentTime) {
+ private void notifyServices(long currentTime) throws Exception {
Iterator<AbstractIoService> it = services.iterator();
while (it.hasNext()) {
AbstractIoService service = it.next();
@@ -113,7 +115,7 @@
}
}
- private void notifySessions(long currentTime) {
+ private void notifySessions(long currentTime) throws Exception {
Iterator<AbstractIoSession> it = sessions.iterator();
while (it.hasNext()) {
AbstractIoSession session = it.next();
@@ -136,7 +138,8 @@
*
* @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
*/
- public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
+ public static void notifyIdleness(Iterator<? extends IoSession> sessions,
+ long currentTime) throws Exception {
IoSession s = null;
while (sessions.hasNext()) {
s = sessions.next();
@@ -144,11 +147,13 @@
}
}
- public static void notifyIdleness(IoService service, long currentTime) {
+ public static void notifyIdleness(IoService service, long currentTime)
+ throws Exception {
notifyIdleness(service, currentTime, true);
}
- private static void notifyIdleness(IoService service, long currentTime, boolean includeSessions) {
+ private static void notifyIdleness(IoService service, long currentTime,
+ boolean includeSessions) throws Exception {
if (!(service instanceof AbstractIoService)) {
return;
}
@@ -166,7 +171,8 @@
*
* @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
*/
- public static void notifyIdleSession(IoSession session, long currentTime) {
+ public static void notifyIdleSession(IoSession session, long currentTime)
+ throws Exception {
notifyIdleSession0(
session, currentTime,
session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
@@ -194,15 +200,15 @@
private static void notifyIdleSession0(
IoSession session, long currentTime,
- long idleTime, IdleStatus status, long lastIoTime) {
+ long idleTime, IdleStatus status, long lastIoTime) throws Exception {
if (idleTime > 0 && lastIoTime != 0
&& currentTime - lastIoTime >= idleTime) {
- session.getFilterChain().fireSessionIdle(status);
+ session.getFilter(0).sessionIdle(0, session, status);
}
}
private static void notifyWriteTimeout(
- IoSession session, long currentTime) {
+ IoSession session, long currentTime) throws Exception {
long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
if (writeTimeout > 0 &&
@@ -213,7 +219,7 @@
session.setCurrentWriteRequest(null);
WriteTimeoutException cause = new WriteTimeoutException(request);
request.getFuture().setException(cause);
- session.getFilterChain().fireExceptionCaught(cause);
+ session.getFilter(0).exceptionCaught(0, session, cause);
// WriteException is an IOException, so we close the session.
session.close();
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoEvent.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoEvent.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoEvent.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoEvent.java Thu Nov 6 17:03:23 2008
@@ -61,44 +61,44 @@
}
public void run() {
- fire();
- }
-
- public void fire() {
- switch (getType()) {
- case MESSAGE_RECEIVED:
- getSession().getFilterChain().fireMessageReceived(getParameter());
- break;
- case MESSAGE_SENT:
- getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());
- break;
- case WRITE:
- getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());
- break;
- case SET_TRAFFIC_MASK:
- getSession().getFilterChain().fireFilterSetTrafficMask((TrafficMask) getParameter());
- break;
- case CLOSE:
- getSession().getFilterChain().fireFilterClose();
- break;
- case EXCEPTION_CAUGHT:
- getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());
- break;
- case SESSION_IDLE:
- getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());
- break;
- case SESSION_OPENED:
- getSession().getFilterChain().fireSessionOpened();
- break;
- case SESSION_CREATED:
- getSession().getFilterChain().fireSessionCreated();
- break;
- case SESSION_CLOSED:
- getSession().getFilterChain().fireSessionClosed();
- break;
- default:
- throw new IllegalArgumentException("Unknown event type: " + getType());
- }
+ try {
+ switch (type) {
+ case MESSAGE_RECEIVED:
+ session.getFilter(0).messageReceived(0, session, getParameter());
+ break;
+ case MESSAGE_SENT:
+ session.getFilter(0).messageSent(0, session, (WriteRequest) getParameter());
+ break;
+ case WRITE:
+ session.getFilter(0).filterWrite(0, session, (WriteRequest) getParameter());
+ break;
+ case SET_TRAFFIC_MASK:
+ session.getFilter(0).filterSetTrafficMask(0, session, (TrafficMask) getParameter());
+ break;
+ case CLOSE:
+ session.getFilter(0).filterClose(0, session);
+ break;
+ case EXCEPTION_CAUGHT:
+ session.getFilter(0).exceptionCaught(0, session, (Throwable) getParameter());
+ break;
+ case SESSION_IDLE:
+ session.getFilter(0).sessionIdle(0, session, (IdleStatus) getParameter());
+ break;
+ case SESSION_OPENED:
+ session.getFilter(0).sessionOpened(0, session);
+ break;
+ case SESSION_CREATED:
+ session.getFilter(0).sessionCreated(0, session);
+ break;
+ case SESSION_CLOSED:
+ session.getFilter(0).sessionClosed(0, session);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown event type: " + getType());
+ }
+ } catch (Exception e) {
+ // TODO : handle the exception
+ }
}
@Override
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoSession.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoSession.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoSession.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/core/session/IoSession.java Thu Nov 6 17:03:23 2008
@@ -20,10 +20,10 @@
package org.apache.mina.core.session;
import java.net.SocketAddress;
+import java.util.List;
import java.util.Set;
import org.apache.mina.core.filterchain.IoFilter;
-import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ReadFuture;
import org.apache.mina.core.future.WriteFuture;
@@ -96,7 +96,16 @@
/**
* @return the filter chain that only affects this session.
*/
- IoFilterChain getFilterChain();
+ List<IoFilter> getFilterChain();
+
+
+ /**
+ * @return the filter at a given position into the chain
+ * that only affects this session.
+ *
+ * @param index the position in the chain
+ */
+ IoFilter getFilter(int index);
/**
@@ -172,7 +181,7 @@
* This operation is asynchronous. Wait for the returned {@link CloseFuture}
* if you want to wait for the session actually closed.
*/
- CloseFuture closeOnFlush();
+ CloseFuture closeOnFlush() throws Exception;
/**
* Closes this session immediately or after all queued write requests
@@ -184,7 +193,7 @@
* {@code false} to close this session after all queued
* write requests are flushed (i.e. {@link #closeOnFlush()}).
*/
- CloseFuture close(boolean immediately);
+ CloseFuture close(boolean immediately) throws Exception;
/**
* Returns an attachment of this session.
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/buffer/BufferedWriteFilter.java Thu Nov 6 17:03:23 2008
@@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.filterchain.ChainEntry;
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.session.IoSession;
@@ -127,7 +128,7 @@
* {@link IoBuffer} instance.
*/
@Override
- public void filterWrite(NextFilter nextFilter, IoSession session,
+ public void filterWrite(ChainEntry nextEntry, IoSession session,
WriteRequest writeRequest) throws Exception {
Object data = writeRequest.getMessage();
@@ -163,7 +164,7 @@
* @param data the data to buffer
* @param buf the buffer where data will be temporarily written
*/
- private void write(IoSession session, IoBuffer data, IoBuffer buf) {
+ private void write(ChainEntry entry, IoSession session, IoBuffer data, IoBuffer buf) {
try {
int len = data.remaining();
if (len >= buf.capacity()) {
@@ -171,9 +172,9 @@
* If the request length exceeds the size of the output buffer,
* flush the output buffer and then write the data directly.
*/
- NextFilter nextFilter = session.getFilterChain().getNextFilter(
+ IoFilter nextFilter = session.getFilterChain().getNextFilter(
this);
- internalFlush(nextFilter, session, buf);
+ internalFlush(entry, session, buf);
nextFilter.filterWrite(session, new DefaultWriteRequest(data));
return;
}
@@ -197,7 +198,7 @@
* @param buf the data to write
* @throws Exception if a write operation fails
*/
- private void internalFlush(NextFilter nextFilter, IoSession session,
+ private void internalFlush(ChainEntry entry, IoSession session,
IoBuffer buf) throws Exception {
IoBuffer tmp = null;
synchronized (buf) {
@@ -206,7 +207,9 @@
buf.clear();
}
logger.debug("Flushing buffer: {}", tmp);
- nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
+ ChainEntry nextEntry = entry.getNextEntry();
+ IoFilter nextFilter = nextEntry.getFilter();
+ nextFilter.filterWrite(nextEntry, session, new DefaultWriteRequest(tmp));
}
/**
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Thu Nov 6 17:03:23 2008
@@ -20,13 +20,13 @@
package org.apache.mina.filter.codec;
import java.net.SocketAddress;
+import java.util.List;
import java.util.Queue;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.filterchain.IoFilterAdapter;
-import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.AttributeKey;
@@ -38,6 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* An {@link IoFilter} which translates binary or protocol specific data into
* message object and vice versa using {@link ProtocolCodecFactory},
@@ -179,24 +180,26 @@
}
@Override
- public void onPreAdd(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
- if (parent.contains(this)) {
+ public void onPreAdd(IoSession session, int index,
+ String name, IoFilter nextFilter) throws Exception {
+ List<IoFilter> filters = session.getFilterChain();
+
+ if (filters.contains(this)) {
throw new IllegalArgumentException(
"You can't add the same filter instance more than once. Create another instance and add it.");
}
}
@Override
- public void onPostRemove(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
+ public void onPostRemove(IoSession session, int index, String name,
+ IoFilter nextFilter) throws Exception {
// We just remove the two instances of encoder/decoder to release resources
// from the session
- disposeEncoder(parent.getSession());
- disposeDecoder(parent.getSession());
+ disposeEncoder(session);
+ disposeDecoder(session);
// We also remove the callback
- disposeDecoderOut(parent.getSession());
+ disposeDecoderOut(session);
}
/**
@@ -212,10 +215,12 @@
*
*/
@Override
- public void messageReceived(NextFilter nextFilter, IoSession session,
- Object message) throws Exception {
+ public void messageReceived(int index, IoSession session,
+ Object message) throws Exception {
+ IoFilter nextFilter = session.getFilter(index);
+
if (!(message instanceof IoBuffer)) {
- nextFilter.messageReceived(session, message);
+ nextFilter.messageReceived(index+1, session, message);
return;
}
@@ -230,7 +235,7 @@
ProtocolDecoderException pde = new ProtocolDecoderException(
"Cannot decode if the decoder is null. Add the filter in the chain" +
"before the first session is created" );
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
return;
}
@@ -244,7 +249,7 @@
ProtocolDecoderException pde = new ProtocolDecoderException(
"Cannot decode if the decoder is null. Add the filter in the chain" +
"before the first session is created" );
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
return;
}
@@ -281,7 +286,7 @@
// Fire the exceptionCaught event.
decoderOut.flush();
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
// Retry only if the type of the caught exception is
// recoverable and the buffer position has changed.
@@ -296,30 +301,34 @@
}
@Override
- public void messageSent(NextFilter nextFilter, IoSession session,
+ public void messageSent(int index, IoSession session,
WriteRequest writeRequest) throws Exception {
- if (writeRequest instanceof EncodedWriteRequest) {
+ IoFilter nextFilter = session.getFilter(index);
+
+ if (writeRequest instanceof EncodedWriteRequest) {
return;
}
if (!(writeRequest instanceof MessageWriteRequest)) {
- nextFilter.messageSent(session, writeRequest);
+ nextFilter.messageSent(index+1, session, writeRequest);
return;
}
MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
- nextFilter.messageSent(session, wrappedRequest.getParentRequest());
+ nextFilter.messageSent(index+1, session, wrappedRequest.getParentRequest());
}
@Override
- public void filterWrite(NextFilter nextFilter, IoSession session,
+ public void filterWrite(int index, IoSession session,
WriteRequest writeRequest) throws Exception {
- Object message = writeRequest.getMessage();
+ IoFilter nextFilter = session.getFilter(index);
+
+ Object message = writeRequest.getMessage();
// Bypass the encoding if the message is contained in a ByteBuffer,
// as it has already been encoded before
if (message instanceof IoBuffer || message instanceof FileRegion) {
- nextFilter.filterWrite(session, writeRequest);
+ nextFilter.filterWrite(index+1, session, writeRequest);
return;
}
@@ -334,7 +343,7 @@
ProtocolDecoderException pde = new ProtocolDecoderException(
"Cannot encode if the encoder is null. Add the filter in the chain" +
"before the first session is created" );
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
return;
}
@@ -349,7 +358,7 @@
ProtocolDecoderException pde = new ProtocolDecoderException(
"Cannot encode if the encoder is null. Add the filter in the chain" +
"before the first session is created" );
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
return;
}
@@ -361,7 +370,7 @@
encoderOut.flushWithoutFuture();
// Call the next filter
- nextFilter.filterWrite(session, new MessageWriteRequest(
+ nextFilter.filterWrite(index+1, session, new MessageWriteRequest(
writeRequest));
} catch (Throwable t) {
ProtocolEncoderException pee;
@@ -389,8 +398,10 @@
* @throws Exception if we can't create instances of the decoder or encoder
*/
@Override
- public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
- // Creates the decoder and stores it into the newly created session
+ public void sessionCreated(int index, IoSession session) throws Exception {
+ IoFilter nextFilter = session.getFilter(index);
+
+ // Creates the decoder and stores it into the newly created session
ProtocolDecoder decoder = factory.getDecoder(session);
session.setAttribute(DECODER, decoder);
@@ -399,13 +410,15 @@
session.setAttribute(ENCODER, encoder);
// Call the next filter
- nextFilter.sessionCreated(session);
+ nextFilter.sessionCreated(index+1, session);
}
@Override
- public void sessionClosed(NextFilter nextFilter, IoSession session)
+ public void sessionClosed(int index, IoSession session)
throws Exception {
- // Call finishDecode() first when a connection is closed.
+ IoFilter nextFilter = session.getFilter(index);
+
+ // Call finishDecode() first when a connection is closed.
ProtocolDecoder decoder = getDecoder(session);
if ( decoder == null) {
@@ -416,7 +429,7 @@
ProtocolDecoderException pde = new ProtocolDecoderException(
"Cannot decode if the decoder is null. Add the filter in the chain" +
"before the first session is created" );
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
return;
}
@@ -430,7 +443,7 @@
ProtocolDecoderException pde = new ProtocolDecoderException(
"Cannot decode if the decoder is null. Add the filter in the chain" +
"before the first session is created" );
- nextFilter.exceptionCaught(session, pde);
+ nextFilter.exceptionCaught(0, session, pde);
return;
}
@@ -452,16 +465,16 @@
decoderOut.flush();
}
- nextFilter.sessionClosed(session);
+ nextFilter.sessionClosed(index+1, session);
}
private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
- NextFilter nextFilter, WriteRequest writeRequest) {
+ IoFilter nextFilter, WriteRequest writeRequest) {
return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
}
private ProtocolDecoderOutput getDecoderOut(IoSession session,
- NextFilter nextFilter) {
+ IoFilter nextFilter) {
ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
if (out == null) {
out = new ProtocolDecoderOutputImpl(session, nextFilter);
@@ -525,18 +538,18 @@
private static class ProtocolDecoderOutputImpl extends
AbstractProtocolDecoderOutput {
private final IoSession session;
- private final NextFilter nextFilter;
+ private final IoFilter nextFilter;
public ProtocolDecoderOutputImpl(
- IoSession session, NextFilter nextFilter) {
+ IoSession session, IoFilter nextFilter) {
this.session = session;
this.nextFilter = nextFilter;
}
- public void flush() {
+ public void flush() throws Exception {
Queue<Object> messageQueue = getMessageQueue();
while (!messageQueue.isEmpty()) {
- nextFilter.messageReceived(session, messageQueue.poll());
+ nextFilter.messageReceived(0, session, messageQueue.poll());
}
}
}
@@ -545,18 +558,18 @@
AbstractProtocolEncoderOutput {
private final IoSession session;
- private final NextFilter nextFilter;
+ private final IoFilter nextFilter;
private final WriteRequest writeRequest;
public ProtocolEncoderOutputImpl(IoSession session,
- NextFilter nextFilter, WriteRequest writeRequest) {
+ IoFilter nextFilter, WriteRequest writeRequest) {
this.session = session;
this.nextFilter = nextFilter;
this.writeRequest = writeRequest;
}
- public WriteFuture flush() {
+ public WriteFuture flush() throws Exception {
Queue<Object> bufferQueue = getMessageQueue();
WriteFuture future = null;
for (;;) {
@@ -569,7 +582,7 @@
if (!(encodedMessage instanceof IoBuffer) ||
((IoBuffer) encodedMessage).hasRemaining()) {
future = new DefaultWriteFuture(session);
- nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
+ nextFilter.filterWrite(0, session, new EncodedWriteRequest(encodedMessage,
future, writeRequest.getDestination()));
}
}
@@ -582,7 +595,7 @@
return future;
}
- public void flushWithoutFuture() {
+ public void flushWithoutFuture() throws Exception {
Queue<Object> bufferQueue = getMessageQueue();
for (;;) {
Object encodedMessage = bufferQueue.poll();
@@ -596,7 +609,7 @@
SocketAddress destination = writeRequest.getDestination();
WriteRequest writeRequest = new EncodedWriteRequest(
encodedMessage, null, destination);
- nextFilter.filterWrite(session, writeRequest);
+ nextFilter.filterWrite(0, session, writeRequest);
}
}
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoderOutput.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoderOutput.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoderOutput.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolDecoderOutput.java Thu Nov 6 17:03:23 2008
@@ -41,5 +41,5 @@
* Flushes all messages you wrote via {@link #write(Object)} to
* the next filter.
*/
- void flush();
+ void flush() throws Exception;
}
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/codec/ProtocolEncoderOutput.java Thu Nov 6 17:03:23 2008
@@ -61,5 +61,5 @@
*
* @return <tt>null</tt> if there is nothing to flush at all.
*/
- WriteFuture flush();
+ WriteFuture flush() throws Exception;
}
\ No newline at end of file
Modified: mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
URL: http://svn.apache.org/viewvc/mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java?rev=712028&r1=712027&r2=712028&view=diff
==============================================================================
--- mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java (original)
+++ mina/branches/mina-new-chain2/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java Thu Nov 6 17:03:23 2008
@@ -168,51 +168,58 @@
}
@Override
- public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ public void exceptionCaught(int index, IoSession session,
Throwable cause) throws Exception {
log(exceptionCaughtLevel, "EXCEPTION :", cause);
- nextFilter.exceptionCaught(session, cause);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.exceptionCaught(index+1, session, cause);
}
@Override
- public void messageReceived(NextFilter nextFilter, IoSession session,
+ public void messageReceived(int index, IoSession session,
Object message) throws Exception {
log(messageReceivedLevel, "RECEIVED: {}", message );
- nextFilter.messageReceived(session, message);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.messageReceived(index+1, session, message);
}
@Override
- public void messageSent(NextFilter nextFilter, IoSession session,
+ public void messageSent(int index, IoSession session,
WriteRequest writeRequest) throws Exception {
log(messageSentLevel, "SENT: {}", writeRequest.getMessage() );
- nextFilter.messageSent(session, writeRequest);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.messageSent(index+1, session, writeRequest);
}
@Override
- public void sessionCreated(NextFilter nextFilter, IoSession session)
+ public void sessionCreated(int index, IoSession session)
throws Exception {
log(sessionCreatedLevel, "CREATED");
- nextFilter.sessionCreated(session);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.sessionCreated(index+1, session);
}
@Override
- public void sessionOpened(NextFilter nextFilter, IoSession session)
+ public void sessionOpened(int index, IoSession session)
throws Exception {
log(sessionOpenedLevel, "OPENED");
- nextFilter.sessionOpened(session);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.sessionOpened(index+1, session);
}
@Override
- public void sessionIdle(NextFilter nextFilter, IoSession session,
+ public void sessionIdle(int index, IoSession session,
IdleStatus status) throws Exception {
log(sessionIdleLevel, "IDLE");
- nextFilter.sessionIdle(session, status);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.sessionIdle(index+1, session, status);
}
@Override
- public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
+ public void sessionClosed(int index, IoSession session) throws Exception {
log(sessionClosedLevel, "CLOSED");
- nextFilter.sessionClosed(session);
+ IoFilter nextFilter = session.getFilter(index);
+ nextFilter.sessionClosed(index+1, session);
}
/**