You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/02 12:57:02 UTC
svn commit: r591310 - in /mina/trunk:
core/src/main/java/org/apache/mina/common/
core/src/main/java/org/apache/mina/filter/codec/
core/src/main/java/org/apache/mina/filter/ssl/
core/src/main/java/org/apache/mina/filter/stream/
core/src/main/java/org/ap...
Author: trustin
Date: Fri Nov 2 04:57:00 2007
New Revision: 591310
URL: http://svn.apache.org/viewvc?rev=591310&view=rev
Log:
Resolved issue: DIRMINA-249 (exceptionCaught() should provide more information)
* Added WriteException which contains the list of the failed WriteRequests
* All write-related exceptions extend WriteException now.
* Added NothingWrittenException
* Added WriteToClosedSessionException
* WriteFuture also provides 'exception' property to tell the cause of the write failure; all failed WriteFutures must provide a related exception now.
* All queued write requests which couldn't be written out due to sudden disconnection are notified as an exceptionCaught event with WriteToClosedSessionException now.
* Improved SslFilter to filter out WriteToClosedSessionException which contains a failed close_notify request.
Resolved issue: DIRMINA-302 (Unbounded nature of writeRequestQueue can cause OutOfMemoryException)
* Added WriteThrottleFilter
* Added WriteThrottlePolicy
* Added TooManyScheduledWriteException (extends WriteException)
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java (with props)
mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java
mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java
mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Fri Nov 2 04:57:00 2007
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@@ -383,6 +385,8 @@
private void clearWriteRequestQueue(AbstractIoSession session) {
Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
+
+ List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
if ((req = writeRequestQueue.poll()) != null) {
Object m = req.getMessage();
@@ -392,18 +396,27 @@
// The first unwritten empty buffer must be
// forwarded to the filter chain.
if (buf.hasRemaining()) {
- req.getFuture().setWritten(false);
+ failedRequests.add(req);
} else {
session.getFilterChain().fireMessageSent(req);
}
} else {
- req.getFuture().setWritten(false);
+ failedRequests.add(req);
}
// Discard others.
while ((req = writeRequestQueue.poll()) != null) {
- req.getFuture().setWritten(false);
+ failedRequests.add(req);
}
+ }
+
+ // Create an exception and notify.
+ if (!failedRequests.isEmpty()) {
+ WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
+ for (WriteRequest r: failedRequests) {
+ r.getFuture().setException(cause);
+ }
+ session.getFilterChain().fireExceptionCaught(cause);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Fri Nov 2 04:57:00 2007
@@ -208,7 +208,10 @@
}
if (isClosing() || !isConnected()) {
- return DefaultWriteFuture.newNotWrittenFuture(this);
+ WriteFuture future = new DefaultWriteFuture(this);
+ WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
+ future.setException(new WriteToClosedSessionException(request));
+ return future;
}
FileChannel channel = null;
@@ -222,7 +225,7 @@
message = new DefaultFileRegion(channel, 0, channel.size());
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
- return DefaultWriteFuture.newNotWrittenFuture(this);
+ return DefaultWriteFuture.newNotWrittenFuture(this, e);
}
} else if (message instanceof File) {
File file = (File) message;
@@ -230,7 +233,7 @@
channel = new FileInputStream(file).getChannel();
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
- return DefaultWriteFuture.newNotWrittenFuture(this);
+ return DefaultWriteFuture.newNotWrittenFuture(this, e);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Fri Nov 2 04:57:00 2007
@@ -417,7 +417,7 @@
}
try {
- request.getFuture().setWritten(true);
+ request.getFuture().setWritten();
} catch (Throwable t) {
fireExceptionCaught(t);
}
@@ -471,7 +471,7 @@
entry.getFilter().filterWrite(entry.getNextFilter(), session,
writeRequest);
} catch (Throwable e) {
- writeRequest.getFuture().setWritten(false);
+ writeRequest.getFuture().setException(e);
fireExceptionCaught(e);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteFuture.java Fri Nov 2 04:57:00 2007
@@ -32,16 +32,16 @@
*/
public static WriteFuture newWrittenFuture(IoSession session) {
DefaultWriteFuture unwrittenFuture = new DefaultWriteFuture(session);
- unwrittenFuture.setWritten(true);
+ unwrittenFuture.setWritten();
return unwrittenFuture;
}
/**
* Returns a new {@link DefaultWriteFuture} which is already marked as 'not written'.
*/
- public static WriteFuture newNotWrittenFuture(IoSession session) {
+ public static WriteFuture newNotWrittenFuture(IoSession session, Throwable cause) {
DefaultWriteFuture unwrittenFuture = new DefaultWriteFuture(session);
- unwrittenFuture.setWritten(false);
+ unwrittenFuture.setException(cause);
return unwrittenFuture;
}
@@ -54,13 +54,30 @@
public boolean isWritten() {
if (isReady()) {
- return ((Boolean) getValue()).booleanValue();
+ Object v = getValue();
+ if (v instanceof Boolean) {
+ return ((Boolean) v).booleanValue();
+ }
}
return false;
}
+
+ public Throwable getException() {
+ if (isReady()) {
+ Object v = getValue();
+ if (v instanceof Throwable) {
+ return (Throwable) v;
+ }
+ }
+ return null;
+ }
- public void setWritten(boolean written) {
- setValue(written);
+ public void setWritten() {
+ setValue(Boolean.TRUE);
+ }
+
+ public void setException(Throwable cause) {
+ setValue(cause);
}
@Override
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultWriteRequest.java Fri Nov 2 04:57:00 2007
@@ -34,7 +34,7 @@
return false;
}
- public void setWritten(boolean written) {
+ public void setWritten() {
}
public IoSession getSession() {
@@ -85,6 +85,13 @@
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
+ }
+
+ public Throwable getException() {
+ return null;
+ }
+
+ public void setException(Throwable cause) {
}
};
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Fri Nov 2 04:57:00 2007
@@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Queue;
import java.util.Set;
/**
@@ -130,10 +131,22 @@
private static void notifyWriteTimeout(IoSession session,
long currentTime, long writeTimeout, long lastIoTime) {
- if (session instanceof AbstractIoSession &&
- writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout &&
- !((AbstractIoSession) session).getWriteRequestQueue().isEmpty()) {
- session.getFilterChain().fireExceptionCaught(new WriteTimeoutException());
+ if (!(session instanceof AbstractIoSession)) {
+ return;
+ }
+
+ AbstractIoSession s = (AbstractIoSession) session;
+ if (writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout &&
+ !s.getWriteRequestQueue().isEmpty()) {
+ Queue<WriteRequest> queue = s.getWriteRequestQueue();
+ WriteRequest request = queue.peek();
+ if (request != null) {
+ WriteTimeoutException cause = new WriteTimeoutException(request);
+ queue.poll().getFuture().setException(cause);
+ s.getFilterChain().fireExceptionCaught(cause);
+ // WriteException is an IOException, so we close the session.
+ s.close();
+ }
}
}
}
Added: mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java Fri Nov 2 04:57:00 2007
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.util.Collection;
+
+/**
+ * An exception which is thrown when one or more write requests resulted
+ * in no actual write operation.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class NothingWrittenException extends WriteException {
+
+ private static final long serialVersionUID = -6331979307737691005L;
+
+ public NothingWrittenException(Collection<WriteRequest> requests,
+ String message, Throwable cause) {
+ super(requests, message, cause);
+ }
+
+ public NothingWrittenException(Collection<WriteRequest> requests, String s) {
+ super(requests, s);
+ }
+
+ public NothingWrittenException(Collection<WriteRequest> requests,
+ Throwable cause) {
+ super(requests, cause);
+ }
+
+ public NothingWrittenException(Collection<WriteRequest> requests) {
+ super(requests);
+ }
+
+ public NothingWrittenException(WriteRequest request, String message,
+ Throwable cause) {
+ super(request, message, cause);
+ }
+
+ public NothingWrittenException(WriteRequest request, String s) {
+ super(request, s);
+ }
+
+ public NothingWrittenException(WriteRequest request, Throwable cause) {
+ super(request, cause);
+ }
+
+ public NothingWrittenException(WriteRequest request) {
+ super(request);
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/NothingWrittenException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java Fri Nov 2 04:57:00 2007
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * An exception which is thrown when one or more write operations were failed.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class WriteException extends IOException {
+
+ private static final long serialVersionUID = -4174407422754524197L;
+
+ private final List<WriteRequest> requests;
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(WriteRequest request) {
+ super();
+ this.requests = asRequestList(request);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(WriteRequest request, String s) {
+ super(s);
+ this.requests = asRequestList(request);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(WriteRequest request, String message, Throwable cause) {
+ super(message);
+ initCause(cause);
+ this.requests = asRequestList(request);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(WriteRequest request, Throwable cause) {
+ initCause(cause);
+ this.requests = asRequestList(request);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(Collection<WriteRequest> requests) {
+ super();
+ this.requests = asRequestList(requests);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(Collection<WriteRequest> requests, String s) {
+ super(s);
+ this.requests = asRequestList(requests);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(Collection<WriteRequest> requests, String message, Throwable cause) {
+ super(message);
+ initCause(cause);
+ this.requests = asRequestList(requests);
+ }
+
+ /**
+ * Creates a new exception.
+ */
+ public WriteException(Collection<WriteRequest> requests, Throwable cause) {
+ initCause(cause);
+ this.requests = asRequestList(requests);
+ }
+
+ /**
+ * Returns the list of the failed {@link WriteRequest}, in the order of occurrance.
+ */
+ public List<WriteRequest> getRequests() {
+ return requests;
+ }
+
+ /**
+ * Returns the firstly failed {@link WriteRequest}.
+ */
+ public WriteRequest getRequest() {
+ return requests.get(0);
+ }
+
+ private static List<WriteRequest> asRequestList(Collection<WriteRequest> requests) {
+ if (requests == null) {
+ throw new NullPointerException("requests");
+ }
+ if (requests.isEmpty()) {
+ throw new IllegalArgumentException("requests is empty.");
+ }
+
+ List<WriteRequest> newRequests = new ArrayList<WriteRequest>(requests.size());
+ for (WriteRequest r: requests) {
+ newRequests.add(unwrapRequest(r));
+ }
+ return Collections.unmodifiableList(newRequests);
+ }
+
+ private static List<WriteRequest> asRequestList(WriteRequest request) {
+ if (request == null) {
+ throw new NullPointerException("request");
+ }
+
+ List<WriteRequest> requests = new ArrayList<WriteRequest>(1);
+ requests.add(unwrapRequest(request));
+ return Collections.unmodifiableList(requests);
+ }
+
+ private static WriteRequest unwrapRequest(WriteRequest request) {
+ while (request instanceof WriteRequestWrapper) {
+ request = ((WriteRequestWrapper) request).getWriteRequest();
+ }
+ return request;
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteFuture.java Fri Nov 2 04:57:00 2007
@@ -47,13 +47,27 @@
* Returns <tt>true</tt> if the write operation is finished successfully.
*/
boolean isWritten();
+
+ /**
+ * Returns the cause of the write failure if and only if the write
+ * operation has failed due to an {@link Exception}. Otherwise,
+ * <tt>null</tt> is returned.
+ */
+ Throwable getException();
/**
- * Sets whether the message is written or not, and notifies all threads
- * waiting for this future. This method is invoked by MINA internally.
- * Please do not call this method directly.
+ * Sets the message is written, and notifies all threads waiting for
+ * this future. This method is invoked by MINA internally. Please do
+ * not call this method directly.
+ */
+ void setWritten();
+
+ /**
+ * Sets the cause of the write failure, and notifies all threads waiting
+ * for this future. This method is invoked by MINA internally. Please
+ * do not call this method directly.
*/
- void setWritten(boolean written);
+ void setException(Throwable cause);
WriteFuture await() throws InterruptedException;
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteTimeoutException.java Fri Nov 2 04:57:00 2007
@@ -19,44 +19,51 @@
*/
package org.apache.mina.common;
-import java.io.IOException;
+import java.util.Collection;
+
/**
- * An {@link IOException} which is thrown when write buffer is not flushed for
+ * An exception which is thrown when write buffer is not flushed for
* {@link IoSessionConfig#getWriteTimeout()} seconds.
*
* @author The Apache MINA Project (dev@mina.apache.org)
* @version $Rev$, $Date$,
*/
-public class WriteTimeoutException extends IOException {
+public class WriteTimeoutException extends WriteException {
private static final long serialVersionUID = 3906931157944579121L;
- /**
- * Creates a new exception.
- */
- public WriteTimeoutException() {
- super();
- }
-
- /**
- * Creates a new exception.
- */
- public WriteTimeoutException(String s) {
- super(s);
- }
-
- /**
- * Creates a new exception.
- */
- public WriteTimeoutException(String message, Throwable cause) {
- super(message);
- initCause(cause);
- }
-
- /**
- * Creates a new exception.
- */
- public WriteTimeoutException(Throwable cause) {
- initCause(cause);
+ public WriteTimeoutException(Collection<WriteRequest> requests,
+ String message, Throwable cause) {
+ super(requests, message, cause);
+ }
+
+ public WriteTimeoutException(Collection<WriteRequest> requests, String s) {
+ super(requests, s);
+ }
+
+ public WriteTimeoutException(Collection<WriteRequest> requests,
+ Throwable cause) {
+ super(requests, cause);
+ }
+
+ public WriteTimeoutException(Collection<WriteRequest> requests) {
+ super(requests);
+ }
+
+ public WriteTimeoutException(WriteRequest request, String message,
+ Throwable cause) {
+ super(request, message, cause);
+ }
+
+ public WriteTimeoutException(WriteRequest request, String s) {
+ super(request, s);
+ }
+
+ public WriteTimeoutException(WriteRequest request, Throwable cause) {
+ super(request, cause);
+ }
+
+ public WriteTimeoutException(WriteRequest request) {
+ super(request);
}
}
Added: mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java Fri Nov 2 04:57:00 2007
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.util.Collection;
+
+/**
+ * An exception which is thrown when one or more write operations were
+ * attempted on a closed session.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class WriteToClosedSessionException extends WriteException {
+
+ private static final long serialVersionUID = 5550204573739301393L;
+
+ public WriteToClosedSessionException(Collection<WriteRequest> requests,
+ String message, Throwable cause) {
+ super(requests, message, cause);
+ }
+
+ public WriteToClosedSessionException(Collection<WriteRequest> requests,
+ String s) {
+ super(requests, s);
+ }
+
+ public WriteToClosedSessionException(Collection<WriteRequest> requests,
+ Throwable cause) {
+ super(requests, cause);
+ }
+
+ public WriteToClosedSessionException(Collection<WriteRequest> requests) {
+ super(requests);
+ }
+
+ public WriteToClosedSessionException(WriteRequest request, String message,
+ Throwable cause) {
+ super(request, message, cause);
+ }
+
+ public WriteToClosedSessionException(WriteRequest request, String s) {
+ super(request, s);
+ }
+
+ public WriteToClosedSessionException(WriteRequest request, Throwable cause) {
+ super(request, cause);
+ }
+
+ public WriteToClosedSessionException(WriteRequest request) {
+ super(request);
+ }
+}
\ No newline at end of file
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteToClosedSessionException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Fri Nov 2 04:57:00 2007
@@ -31,6 +31,7 @@
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.NothingWrittenException;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteRequestWrapper;
@@ -223,7 +224,7 @@
try {
encoder.encode(session, message, encoderOut);
- encoderOut.flush();
+ encoderOut.flushWithoutFuture();
nextFilter.filterWrite(session, new MessageWriteRequest(
writeRequest));
} catch (Throwable t) {
@@ -403,10 +404,28 @@
}
if (future == null) {
- future = DefaultWriteFuture.newNotWrittenFuture(session);
+ future = DefaultWriteFuture.newNotWrittenFuture(
+ session, new NothingWrittenException(writeRequest));
}
return future;
+ }
+
+ public void flushWithoutFuture() {
+ Queue<IoBuffer> bufferQueue = getBufferQueue();
+ for (;;) {
+ IoBuffer buf = bufferQueue.poll();
+ if (buf == null) {
+ break;
+ }
+
+ // Flush only when the buffer has remaining.
+ if (buf.hasRemaining()) {
+ nextFilter.filterWrite(
+ session, new EncodedWriteRequest(
+ buf, null, writeRequest.getDestination()));
+ }
+ }
}
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecSession.java Fri Nov 2 04:57:00 2007
@@ -60,7 +60,7 @@
public class ProtocolCodecSession extends DummySession {
private final WriteFuture notWrittenFuture =
- DefaultWriteFuture.newNotWrittenFuture(this);
+ DefaultWriteFuture.newNotWrittenFuture(this, new UnsupportedOperationException());
private final AbstractProtocolEncoderOutput encoderOutput =
new AbstractProtocolEncoderOutput() {
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java Fri Nov 2 04:57:00 2007
@@ -20,6 +20,8 @@
package org.apache.mina.filter.ssl;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -40,6 +42,7 @@
import org.apache.mina.common.WriteFuture;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteRequestWrapper;
+import org.apache.mina.common.WriteToClosedSessionException;
/**
* An SSL filter that encrypts and decrypts the data exchanged in the session.
@@ -464,6 +467,63 @@
}
@Override
+ public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ Throwable cause) throws Exception {
+
+ if (cause instanceof WriteToClosedSessionException) {
+ // Filter out SSL close notify, which is likely to fail to flush
+ // due to disconnection.
+ WriteToClosedSessionException e = (WriteToClosedSessionException) cause;
+ List<WriteRequest> failedRequests = e.getRequests();
+ boolean containsCloseNotify = false;
+ for (WriteRequest r: failedRequests) {
+ if (isCloseNotify(r.getMessage())) {
+ containsCloseNotify = true;
+ break;
+ }
+ }
+
+ if (containsCloseNotify) {
+ if (failedRequests.size() == 1) {
+ // close notify is the only failed request; bail out.
+ return;
+ }
+
+ List<WriteRequest> newFailedRequests =
+ new ArrayList<WriteRequest>(failedRequests.size() - 1);
+ for (WriteRequest r: failedRequests) {
+ if (!isCloseNotify(r.getMessage())) {
+ newFailedRequests.add(r);
+ }
+ }
+
+ if (newFailedRequests.isEmpty()) {
+ // the failedRequests were full with close notify; bail out.
+ return;
+ }
+
+ cause = new WriteToClosedSessionException(
+ newFailedRequests, cause.getMessage(), cause.getCause());
+ }
+ }
+
+ nextFilter.exceptionCaught(session, cause);
+ }
+
+ private boolean isCloseNotify(Object message) {
+ if (!(message instanceof IoBuffer)) {
+ return false;
+ }
+
+ IoBuffer buf = (IoBuffer) message;
+ int offset = buf.position();
+ return buf.remaining() == 23 &&
+ buf.get(offset + 0) == 0x15 && buf.get(offset + 1) == 0x03 &&
+ buf.get(offset + 2) == 0x01 && buf.get(offset + 3) == 0x00 &&
+ buf.get(offset + 4) == 0x12;
+ }
+
+ @Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws SSLException {
boolean needsFlush = true;
@@ -582,11 +642,15 @@
SslHandler handler = getSslSessionHandler(session);
// if already shut down
if (!handler.closeOutbound()) {
- return DefaultWriteFuture.newNotWrittenFuture(session);
+ return DefaultWriteFuture.newNotWrittenFuture(
+ session, new IllegalStateException("SSL session is shut down already."));
}
// there might be data to write out here?
WriteFuture future = handler.writeNetBuffer(nextFilter);
+ if (future == null) {
+ future = DefaultWriteFuture.newWrittenFuture(session);
+ }
if (handler.isInboundDone()) {
handler.destroy();
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/ssl/SslHandler.java Fri Nov 2 04:57:00 2007
@@ -540,7 +540,7 @@
// Check if any net data needed to be writen
if (!getOutNetBuffer().hasRemaining()) {
// no; bail out
- return DefaultWriteFuture.newNotWrittenFuture(session);
+ return null;
}
// set flag that we are writing encrypted data
@@ -555,7 +555,7 @@
logger.debug( " write outNetBuffer: "
+ getOutNetBuffer());
}
- org.apache.mina.common.IoBuffer writeBuffer = copy(getOutNetBuffer());
+ IoBuffer writeBuffer = copy(getOutNetBuffer());
if (logger.isDebugEnabled()) {
logger.debug( " session write: " + writeBuffer);
}
@@ -580,7 +580,7 @@
logger.debug( " write outNetBuffer2: "
+ getOutNetBuffer());
}
- org.apache.mina.common.IoBuffer writeBuffer2 = copy(getOutNetBuffer());
+ IoBuffer writeBuffer2 = copy(getOutNetBuffer());
writeFuture = new DefaultWriteFuture(session);
parent.filterWrite(nextFilter, session,
new DefaultWriteRequest(writeBuffer2, writeFuture));
@@ -590,11 +590,7 @@
writingEncryptedData = false;
}
- if (writeFuture != null) {
- return writeFuture;
- } else {
- return DefaultWriteFuture.newNotWrittenFuture(session);
- }
+ return writeFuture;
}
private void unwrap(NextFilter nextFilter) throws SSLException {
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/stream/StreamWriteFilter.java Fri Nov 2 04:57:00 2007
@@ -31,6 +31,7 @@
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.IoFilter.NextFilter;
/**
* Filter implementation which makes it possible to write {@link InputStream}
@@ -105,7 +106,7 @@
IoBuffer buffer = getNextBuffer(inputStream);
if (buffer == null) {
// End of stream reached.
- writeRequest.getFuture().setWritten(true);
+ writeRequest.getFuture().setWritten();
nextFilter.messageSent(session, writeRequest);
} else {
session.setAttribute(CURRENT_STREAM, inputStream);
@@ -156,7 +157,7 @@
}
}
- currentWriteRequest.getFuture().setWritten(true);
+ currentWriteRequest.getFuture().setWritten();
nextFilter.messageSent(session, currentWriteRequest);
} else {
nextFilter.filterWrite(session, new DefaultWriteRequest(
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java Fri Nov 2 04:57:00 2007
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.traffic;
+
+import java.util.Collection;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteException;
+import org.apache.mina.common.WriteRequest;
+
+/**
+ * An exception that is thrown by {@link WriteThrottleFilter} when
+ * there are too many scheduled write requests or too much amount
+ * of scheduled write data in an {@link IoSession}'s internal write
+ * request queue.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class TooManyScheduledWritesException extends WriteException {
+
+ private static final long serialVersionUID = 7377810360950976904L;
+
+ public TooManyScheduledWritesException(Collection<WriteRequest> requests,
+ String message, Throwable cause) {
+ super(requests, message, cause);
+ }
+
+ public TooManyScheduledWritesException(Collection<WriteRequest> requests,
+ String s) {
+ super(requests, s);
+ }
+
+ public TooManyScheduledWritesException(Collection<WriteRequest> requests,
+ Throwable cause) {
+ super(requests, cause);
+ }
+
+ public TooManyScheduledWritesException(Collection<WriteRequest> requests) {
+ super(requests);
+ }
+
+ public TooManyScheduledWritesException(WriteRequest request,
+ String message, Throwable cause) {
+ super(request, message, cause);
+ }
+
+ public TooManyScheduledWritesException(WriteRequest request, String s) {
+ super(request, s);
+ }
+
+ public TooManyScheduledWritesException(WriteRequest request, Throwable cause) {
+ super(request, cause);
+ }
+
+ public TooManyScheduledWritesException(WriteRequest request) {
+ super(request);
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/TooManyScheduledWritesException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java Fri Nov 2 04:57:00 2007
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.traffic;
+
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.WriteException;
+import org.apache.mina.common.WriteRequest;
+
+/**
+ * This filter will turn the asynchronous write method in to a blocking send when there are more than
+ * the prescribed number of messages awaiting sending. It should be used in conjunction with the
+ * {@link ReadThrottleFilterBuilder} on a server as the blocking writes will allow the read thread to
+ * cause an Out of Memory exception due to a back log of unprocessed messages.
+ *
+ * This is should only be viewed as a temporary work around for DIRMINA-302.
+ *
+ * A true solution should not be implemented as a filter as this issue will always occur. On a machine
+ * where the network is slower than the local producer.
+ *
+ * Suggested improvement is to allow implementation of policices on what to do when buffer is full.
+ *
+ * They could be:
+ * Block - As this does
+ * Wait on a given Future - to drain more of the queue.. in essence this filter with high/low watermarks
+ * Throw Exception - through the client write() method to allow them to get immediate feedback on buffer state
+ *
+ * <p/>
+ * <p>Usage:
+ * <p/>
+ * <pre><code>
+ * DefaultFilterChainBuilder builder = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( builder );
+ * </code></pre>
+ * <p/>
+ * or
+ * <p/>
+ * <pre><code>
+ * IoFilterChain chain = ...
+ * WriteBufferLimitFilterBuilder filter = new WriteBufferLimitFilterBuilder();
+ * filter.attach( chain );
+ * </code></pre>
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class WriteThrottleFilter extends IoFilterAdapter {
+
+ private final Object logLock = new Object();
+ private final Object blockLock = new Object();
+
+ private long lastLogTime = -1;
+ private int blockWaiters = 0;
+
+ private volatile WriteThrottlePolicy policy;
+
+ private volatile int localMaxScheduledWriteMessages;
+ private volatile long localMaxScheduledWriteBytes;
+ private volatile int globalMaxScheduledWriteMessages;
+ private volatile long globalMaxScheduledWriteBytes;
+
+ public WriteThrottleFilter() {
+ this(WriteThrottlePolicy.LOG);
+ }
+
+ public WriteThrottleFilter(WriteThrottlePolicy policy) {
+ this(policy, 4096, 65536, 131072, 1048576 * 128);
+ }
+
+ public WriteThrottleFilter(
+ int localMaxScheduledWriteMessages, long localMaxScheduledWriteBytes,
+ int globalMaxScheduledWriteMessages, long globalMaxScheduledWriteBytes) {
+ this(WriteThrottlePolicy.LOG,
+ localMaxScheduledWriteMessages, localMaxScheduledWriteBytes,
+ globalMaxScheduledWriteMessages, globalMaxScheduledWriteBytes);
+ }
+
+ public WriteThrottleFilter(
+ WriteThrottlePolicy policy,
+ int localMaxScheduledWriteMessages, long localMaxScheduledWriteBytes,
+ int globalMaxScheduledWriteMessages, long globalMaxScheduledWriteBytes) {
+
+ setPolicy(policy);
+ setLocalMaxScheduledWriteMessages(localMaxScheduledWriteMessages);
+ setLocalMaxScheduledWriteBytes(localMaxScheduledWriteBytes);
+ setGlobalMaxScheduledWriteMessages(globalMaxScheduledWriteMessages);
+ setGlobalMaxScheduledWriteBytes(globalMaxScheduledWriteBytes);
+ }
+
+ public WriteThrottlePolicy getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(WriteThrottlePolicy policy) {
+ if (policy == null) {
+ throw new NullPointerException("policy");
+ }
+ this.policy = policy;
+ }
+
+ public int getLocalMaxScheduledWriteMessages() {
+ return localMaxScheduledWriteMessages;
+ }
+
+ public void setLocalMaxScheduledWriteMessages(int localMaxScheduledWriteMessages) {
+ if (localMaxScheduledWriteMessages < 0) {
+ localMaxScheduledWriteMessages = 0;
+ }
+ this.localMaxScheduledWriteMessages = localMaxScheduledWriteMessages;
+ }
+
+ public long getLocalMaxScheduledWriteBytes() {
+ return localMaxScheduledWriteBytes;
+ }
+
+ public void setLocalMaxScheduledWriteBytes(long localMaxScheduledWriteBytes) {
+ if (localMaxScheduledWriteBytes < 0) {
+ localMaxScheduledWriteBytes = 0;
+ }
+ this.localMaxScheduledWriteBytes = localMaxScheduledWriteBytes;
+ }
+
+ public int getGlobalMaxScheduledWriteMessages() {
+ return globalMaxScheduledWriteMessages;
+ }
+
+ public void setGlobalMaxScheduledWriteMessages(int globalMaxScheduledWriteMessages) {
+ if (globalMaxScheduledWriteMessages < 0) {
+ globalMaxScheduledWriteMessages = 0;
+ }
+ this.globalMaxScheduledWriteMessages = globalMaxScheduledWriteMessages;
+ }
+
+ public long getGlobalMaxScheduledWriteBytes() {
+ return globalMaxScheduledWriteBytes;
+ }
+
+ public void setGlobalMaxScheduledWriteBytes(long globalMaxScheduledWriteBytes) {
+ if (globalMaxScheduledWriteBytes < 0) {
+ globalMaxScheduledWriteBytes = 0;
+ }
+ this.globalMaxScheduledWriteBytes = globalMaxScheduledWriteBytes;
+ }
+
+ @Override
+ public void filterWrite(NextFilter nextFilter, IoSession session,
+ WriteRequest writeRequest) throws Exception {
+
+ WriteThrottlePolicy policy = getPolicy();
+ if (policy != WriteThrottlePolicy.OFF) {
+ if (!readyToWrite(session)) {
+ switch (getPolicy()) {
+ case LOG:
+ log(session);
+ break;
+ case BLOCK:
+ block(session);
+ break;
+ case LOG_AND_BLOCK:
+ log(session);
+ block(session);
+ break;
+ case EXCEPTION:
+ raiseException(session, writeRequest);
+ default:
+ throw new InternalError();
+ }
+ }
+ }
+
+ nextFilter.filterWrite(session, writeRequest);
+ }
+
+ private boolean readyToWrite(IoSession session) {
+ if (session.isClosing()) {
+ return true;
+ }
+
+ int lmswm = localMaxScheduledWriteMessages;
+ long lmswb = localMaxScheduledWriteBytes;
+ int gmswm = globalMaxScheduledWriteMessages;
+ long gmswb = globalMaxScheduledWriteBytes;
+
+ return (lmswm == 0 || session.getScheduledWriteMessages() < lmswm) &&
+ (lmswb == 0 || session.getScheduledWriteBytes() < lmswb) &&
+ (gmswm == 0 || session.getService().getScheduledWriteMessages() < gmswm) &&
+ (gmswb == 0 || session.getService().getScheduledWriteBytes() < gmswb);
+ }
+
+ private void log(IoSession session) {
+ long currentTime = System.currentTimeMillis();
+
+ // Prevent log flood by logging every 3 seconds.
+ boolean log;
+ synchronized (logLock) {
+ if (currentTime - lastLogTime > 3000) {
+ lastLogTime = currentTime;
+ log = true;
+ } else {
+ log = false;
+ }
+ }
+
+ if (log) {
+ IoSessionLogger.getLogger(session, getClass()).warn(getMessage(session));
+ }
+ }
+
+ @Override
+ public void messageSent(
+ NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
+ notifyWaitingWriters();
+ nextFilter.messageSent(session, writeRequest);
+ }
+
+ @Override
+ public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ Throwable cause) throws Exception {
+ try {
+ nextFilter.exceptionCaught(session, cause);
+ } finally {
+ notifyWaitingWriters();
+ }
+ }
+
+ @Override
+ public void sessionClosed(NextFilter nextFilter, IoSession session)
+ throws Exception {
+ notifyWaitingWriters();
+ nextFilter.sessionClosed(session);
+ }
+
+ private void block(IoSession session) {
+ synchronized (blockLock) {
+ blockWaiters ++;
+ while (!readyToWrite(session)) {
+ try {
+ blockLock.wait();
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+ blockWaiters --;
+ }
+ }
+
+ private void notifyWaitingWriters() {
+ synchronized (blockLock) {
+ if (blockWaiters != 0) {
+ blockLock.notifyAll();
+ }
+ }
+ }
+
+ private void raiseException(IoSession session, WriteRequest writeRequest) throws WriteException {
+ throw new TooManyScheduledWritesException(writeRequest, getMessage(session));
+ }
+
+ private String getMessage(IoSession session) {
+ int lmswm = localMaxScheduledWriteMessages;
+ long lmswb = localMaxScheduledWriteBytes;
+ int gmswm = globalMaxScheduledWriteMessages;
+ long gmswb = globalMaxScheduledWriteBytes;
+
+ StringBuilder buf = new StringBuilder(512);
+ buf.append("Write requests flooded - local: ");
+ if (lmswm != 0) {
+ buf.append(session.getScheduledWriteMessages());
+ buf.append(" / ");
+ buf.append(lmswm);
+ buf.append(" msgs, ");
+ } else {
+ buf.append(session.getScheduledWriteMessages());
+ buf.append(" / unlimited msgs, ");
+ }
+
+ if (lmswb != 0) {
+ buf.append(session.getScheduledWriteBytes());
+ buf.append(" / ");
+ buf.append(lmswb);
+ buf.append(" bytes, ");
+ } else {
+ buf.append(session.getScheduledWriteBytes());
+ buf.append(" / unlimited bytes, ");
+ }
+
+ buf.append("global: ");
+ if (gmswm != 0) {
+ buf.append(session.getService().getScheduledWriteMessages());
+ buf.append(" / ");
+ buf.append(gmswm);
+ buf.append(" msgs, ");
+ } else {
+ buf.append(session.getService().getScheduledWriteMessages());
+ buf.append(" / unlimited msgs, ");
+ }
+
+ if (gmswb != 0) {
+ buf.append(session.getService().getScheduledWriteBytes());
+ buf.append(" / ");
+ buf.append(gmswb);
+ buf.append(" bytes");
+ } else {
+ buf.append(session.getService().getScheduledWriteBytes());
+ buf.append(" / unlimited bytes");
+ }
+
+ return buf.toString();
+ }
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottleFilter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java?rev=591310&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java Fri Nov 2 04:57:00 2007
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.traffic;
+
+/**
+ * Tells {@link WriteThrottleFilter} what to do when there are too many
+ * scheduled write requests in the session buffer.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public enum WriteThrottlePolicy {
+ /**
+ * Do nothing; disables the filter.
+ */
+ OFF,
+ /**
+ * Log a warning message, but doesn't limit anything.
+ */
+ LOG,
+ /**
+ * Block the write operation until the size of write request buffer
+ * is full. You must use this policy in conjunction with the
+ * {@link ReadThrottleFilterChainBuilder} to prevent the
+ * {@link OutOfMemoryError} on the reader side.
+ */
+ BLOCK,
+ /**
+ * Combination of {@link #WARN} and {@link #BLOCK}.
+ */
+ LOG_AND_BLOCK,
+ /**
+ * Raise a {@link TooManyScheduledWritesException}.
+ */
+ EXCEPTION,
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/filter/traffic/WriteThrottlePolicy.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Fri Nov 2 04:57:00 2007
@@ -33,6 +33,7 @@
import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteToClosedSessionException;
/**
* @author The Apache MINA Project (dev@mina.apache.org)
@@ -199,9 +200,13 @@
flushPendingDataQueues(s);
}
} else {
- WriteRequest req;
- while ((req = queue.poll()) != null) {
- req.getFuture().setWritten(false);
+ List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(queue);
+ if (!failedRequests.isEmpty()) {
+ WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
+ for (WriteRequest r: failedRequests) {
+ r.getFuture().setException(cause);
+ }
+ s.getFilterChain().fireExceptionCaught(cause);
}
}
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/common/FutureTest.java Fri Nov 2 04:57:00 2007
@@ -94,7 +94,7 @@
TestThread thread = new TestThread(future);
thread.start();
- future.setWritten(true);
+ future.setWritten();
thread.join();
assertTrue(thread.success);
@@ -105,12 +105,13 @@
thread = new TestThread(future);
thread.start();
- future.setWritten(false);
+ future.setException(new Exception());
thread.join();
assertTrue(thread.success);
assertTrue(future.isReady());
assertFalse(future.isWritten());
+ assertTrue(future.getException().getClass() == Exception.class);
}
public void testAddListener() throws Exception {
Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java Fri Nov 2 04:57:00 2007
@@ -25,10 +25,10 @@
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.mina.common.BufferDataException;
import org.apache.mina.common.IoBuffer;
import org.apache.mina.filter.codec.ProtocolCodecSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.RecoverableProtocolDecoderException;
/**
* Tests {@link TextLineDecoder}.
@@ -273,7 +273,7 @@
try {
decoder.decode(session, in, out);
Assert.fail();
- } catch (BufferDataException e) {
+ } catch (RecoverableProtocolDecoderException e) {
// Success!
}
@@ -296,7 +296,7 @@
try {
decoder.decode(session, in, out);
Assert.fail();
- } catch (BufferDataException e) {
+ } catch (RecoverableProtocolDecoderException e) {
// Success!
}
Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/stream/StreamWriteFilterTest.java Fri Nov 2 04:57:00 2007
@@ -535,8 +535,8 @@
return written;
}
- public void setWritten(boolean written) {
- this.written = written;
+ public void setWritten() {
+ this.written = true;
}
public IoSession getSession() {
@@ -589,6 +589,14 @@
public boolean awaitUninterruptibly(long timeoutMillis) {
return true;
+ }
+
+ public Throwable getException() {
+ return null;
+ }
+
+ public void setException(Throwable cause) {
+ throw new IllegalStateException();
}
}
}
Modified: mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java (original)
+++ mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java Fri Nov 2 04:57:00 2007
@@ -19,16 +19,16 @@
*/
package org.apache.mina.example.echoserver;
-import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.WriteException;
import org.apache.mina.filter.ssl.SslFilter;
import org.apache.mina.transport.socket.SocketSession;
import org.apache.mina.transport.socket.SocketSessionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link IoHandler} implementation for echo server.
@@ -37,9 +37,6 @@
* @version $Rev$, $Date$,
*/
public class EchoProtocolHandler extends IoHandlerAdapter {
- private static final Logger log = LoggerFactory
- .getLogger(EchoProtocolHandler.class);
-
@Override
public void sessionCreated(IoSession session) {
if (session instanceof SocketSession) {
@@ -55,13 +52,17 @@
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
- log.info("*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE)
- + " ***");
+ IoSessionLogger.getLogger(session).info(
+ "*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE) + " ***");
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
- cause.printStackTrace();
+ IoSessionLogger.getLogger(session).warn(cause);
+ if (cause instanceof WriteException) {
+ WriteException e = (WriteException) cause;
+ IoSessionLogger.getLogger(session).warn("Failed write requests: {}", e.getRequests());
+ }
session.close();
}
Modified: mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java?rev=591310&r1=591309&r2=591310&view=diff
==============================================================================
--- mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java (original)
+++ mina/trunk/example/src/test/java/org/apache/mina/example/echoserver/ConnectorTest.java Fri Nov 2 04:57:00 2007
@@ -23,13 +23,14 @@
import junit.framework.Assert;
-import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionLogger;
import org.apache.mina.common.RuntimeIoException;
+import org.apache.mina.common.WriteException;
import org.apache.mina.common.WriteFuture;
import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory;
import org.apache.mina.filter.ssl.SslFilter;
@@ -50,6 +51,7 @@
private final int DATA_SIZE = 16;
+ private EchoConnectorHandler handler;
private SslFilter connectorSSLFilter;
public ConnectorTest() {
@@ -58,7 +60,7 @@
@Override
protected void setUp() throws Exception {
super.setUp();
-
+ handler = new EchoConnectorHandler();
connectorSSLFilter = new SslFilter(BogusSslContextFactory
.getInstance(false));
connectorSSLFilter.setUseClientMode(true); // set client mode
@@ -85,6 +87,8 @@
}
private void testConnector(IoConnector connector) throws Exception {
+ connector.setHandler(handler);
+
System.out.println("* Without localAddress");
testConnector(connector, false);
@@ -94,11 +98,8 @@
private void testConnector(IoConnector connector, boolean useLocalAddress)
throws Exception {
- EchoConnectorHandler handler = new EchoConnectorHandler();
-
IoSession session = null;
if (!useLocalAddress) {
- connector.setHandler(handler);
ConnectFuture future = connector.connect(new InetSocketAddress(
"localhost", port));
future.awaitUninterruptibly();
@@ -109,7 +110,6 @@
clientPort = AvailablePortFinder
.getNextAvailable(clientPort + 1);
try {
- connector.setHandler(handler);
ConnectFuture future = connector.connect(
new InetSocketAddress("localhost", port),
new InetSocketAddress(clientPort));
@@ -245,7 +245,11 @@
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
- cause.printStackTrace();
+ IoSessionLogger.getLogger(session).warn(cause);
+ if (cause instanceof WriteException) {
+ WriteException e = (WriteException) cause;
+ IoSessionLogger.getLogger(session).warn("Failed write requests: {}", e.getRequests());
+ }
}
}
}