You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/07 12:02:05 UTC
svn commit: r592697 - in /mina/trunk/core/src:
main/java/org/apache/mina/common/
main/java/org/apache/mina/transport/socket/nio/
main/java/org/apache/mina/transport/vmpipe/ main/java/org/apache/mina/util/
test/java/org/apache/mina/filter/codec/textline/
Author: trustin
Date: Wed Nov 7 03:01:58 2007
New Revision: 592697
URL: http://svn.apache.org/viewvc?rev=592697&view=rev
Log:
* Replaced Queue<WriteRequest> with WriteRequestQueue which provides dispose() method which is needed for a certain queue implementation like JMS queue.
* Optimized CircularQueue.clear() a little bit.
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java (with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Wed Nov 7 03:01:58 2007
@@ -364,7 +364,8 @@
case OPEN:
try {
boolean flushedAll = flush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
+ !session.isScheduledForFlush()) {
scheduleFlush(session);
}
} catch (Exception e) {
@@ -387,12 +388,12 @@
}
private void clearWriteRequestQueue(AbstractIoSession session) {
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
- if ((req = writeRequestQueue.poll()) != null) {
+ if ((req = writeRequestQueue.poll(session)) != null) {
Object m = req.getMessage();
if (m instanceof IoBuffer) {
IoBuffer buf = (IoBuffer) req.getMessage();
@@ -410,7 +411,7 @@
}
// Discard others.
- while ((req = writeRequestQueue.poll()) != null) {
+ while ((req = writeRequestQueue.poll(session)) != null) {
failedRequests.add(req);
}
}
@@ -435,7 +436,7 @@
// Clear OP_WRITE
setOpWrite(session, false);
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
// Set limitation for the number of written bytes for read-write
// fairness. I used maxReadBufferSize * 3 / 2, which yields best
@@ -446,10 +447,13 @@
do {
// Check for pending writes.
- WriteRequest req = writeRequestQueue.peek();
-
+ WriteRequest req = session.getCurrentWriteRequest();
if (req == null) {
- break;
+ req = writeRequestQueue.poll(session);
+ if (req == null) {
+ break;
+ }
+ session.setCurrentWriteRequest(req);
}
long localWrittenBytes = 0;
@@ -458,8 +462,8 @@
FileRegion region = (FileRegion) message;
if (region.getCount() <= 0) {
- // File has been sent, remove from queue
- writeRequestQueue.poll();
+ // File has been sent, clear the current request.
+ session.setCurrentWriteRequest(null);
session.getFilterChain().fireMessageSent(req);
continue;
}
@@ -469,8 +473,8 @@
} else {
IoBuffer buf = (IoBuffer) message;
if (buf.remaining() == 0) {
- // Buffer has been completely sent, remove request form queue
- writeRequestQueue.poll();
+ // Buffer has been sent, clear the current request.
+ session.setCurrentWriteRequest(null);
buf.reset();
session.getFilterChain().fireMessageSent(req);
continue;
@@ -518,7 +522,7 @@
try {
setOpWrite(
session,
- !session.getWriteRequestQueue().isEmpty() &&
+ !session.getWriteRequestQueue().isEmpty(session) &&
(mask & SelectionKey.OP_WRITE) != 0);
} catch (Exception e) {
session.getFilterChain().fireExceptionCaught(e);
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java Wed Nov 7 03:01:58 2007
@@ -24,9 +24,6 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -60,8 +57,8 @@
private final Object lock = new Object();
private IoSessionAttributeMap attributes;
- private Queue<WriteRequest> writeRequestQueue;
-
+ private WriteRequestQueue writeRequestQueue;
+ private WriteRequest currentWriteRequest;
private final long creationTime;
/**
@@ -74,33 +71,22 @@
// Status variables
private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
-
private final AtomicLong scheduledWriteBytes = new AtomicLong();
-
private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
private long readBytes;
-
private long writtenBytes;
-
private long readMessages;
-
private long writtenMessages;
-
private long lastReadTime;
-
private long lastWriteTime;
private int idleCountForBoth;
-
private int idleCountForRead;
-
private int idleCountForWrite;
private long lastIdleTimeForBoth;
-
private long lastIdleTimeForRead;
-
private long lastIdleTimeForWrite;
private boolean deferDecreaseReadBuffer = true;
@@ -161,7 +147,7 @@
}
public CloseFuture closeOnFlush() {
- getWriteRequestQueue().offer(CLOSE_REQUEST);
+ getWriteRequestQueue().offer(this, CLOSE_REQUEST);
getProcessor().flush(this);
return closeFuture;
}
@@ -286,8 +272,9 @@
this.attributes = attributes;
}
- protected void setWriteRequestQueue(Queue<WriteRequest> writeRequestQueue) {
- this.writeRequestQueue = new WriteRequestQueue(writeRequestQueue);
+ protected void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
+ this.writeRequestQueue =
+ new CloseRequestAwareWriteRequestQueue(writeRequestQueue);
}
public TrafficMask getTrafficMask() {
@@ -441,9 +428,17 @@
}
}
- protected Queue<WriteRequest> getWriteRequestQueue() {
+ protected WriteRequestQueue getWriteRequestQueue() {
return writeRequestQueue;
}
+
+ protected WriteRequest getCurrentWriteRequest() {
+ return currentWriteRequest;
+ }
+
+ protected void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
+ this.currentWriteRequest = currentWriteRequest;
+ }
protected void increaseReadBufferSize() {
int newReadBufferSize = getConfig().getReadBufferSize() << 1;
@@ -589,97 +584,38 @@
}
}
- private class WriteRequestQueue implements Queue<WriteRequest> {
+ private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
- private final Queue<WriteRequest> q;
+ private final WriteRequestQueue q;
- public WriteRequestQueue(Queue<WriteRequest> q) {
+ public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
this.q = q;
}
- // Discard close request offered by closeOnFlush() silently.
- public WriteRequest peek() {
- WriteRequest answer = q.peek();
+ public synchronized WriteRequest poll(IoSession session) {
+ WriteRequest answer = q.poll(session);
if (answer == CLOSE_REQUEST) {
AbstractIoSession.this.close();
- clear();
+ dispose(session);
answer = null;
}
return answer;
}
-
- public synchronized WriteRequest poll() {
- WriteRequest answer = q.poll();
- if (answer == CLOSE_REQUEST) {
- AbstractIoSession.this.close();
- clear();
- answer = null;
- }
- return answer;
- }
-
- public boolean add(WriteRequest e) {
- return q.add(e);
- }
-
- public WriteRequest element() {
- return q.element();
- }
-
- public boolean offer(WriteRequest e) {
- return q.offer(e);
- }
-
- public WriteRequest remove() {
- return q.remove();
- }
-
- public boolean addAll(Collection<? extends WriteRequest> c) {
- return q.addAll(c);
- }
-
- public void clear() {
- q.clear();
- }
-
- public boolean contains(Object o) {
- return q.contains(o);
- }
-
- public boolean containsAll(Collection<?> c) {
- return q.containsAll(c);
- }
-
- public boolean isEmpty() {
- return q.isEmpty();
- }
-
- public Iterator<WriteRequest> iterator() {
- return q.iterator();
- }
-
- public boolean remove(Object o) {
- return q.remove(o);
- }
-
- public boolean removeAll(Collection<?> c) {
- return q.removeAll(c);
- }
-
- public boolean retainAll(Collection<?> c) {
- return q.retainAll(c);
+
+ public void offer(IoSession session, WriteRequest e) {
+ q.offer(session, e);
}
- public int size() {
- return q.size();
+ public boolean isEmpty(IoSession session) {
+ return q.isEmpty(session);
}
-
- public Object[] toArray() {
- return q.toArray();
+
+ public void clear(IoSession session) {
+ q.clear(session);
}
- public <T> T[] toArray(T[] a) {
- return q.toArray(a);
+ public void dispose(IoSession session) {
+ q.dispose(session);
}
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java Wed Nov 7 03:01:58 2007
@@ -632,7 +632,7 @@
s.increaseScheduledWriteMessages();
}
- s.getWriteRequestQueue().add(writeRequest);
+ s.getWriteRequestQueue().offer(s, writeRequest);
if (s.getTrafficMask().isWritable()) {
s.getProcessor().flush(s);
}
@@ -683,10 +683,14 @@
session.getHandler().sessionClosed(session);
} finally {
try {
- ((AbstractIoSession) session).getAttributeMap().dispose(session);
+ ((AbstractIoSession) session).getWriteRequestQueue().dispose(session);
} finally {
- // Remove all filters.
- session.getFilterChain().clear();
+ try {
+ ((AbstractIoSession) session).getAttributeMap().dispose(session);
+ } finally {
+ // Remove all filters.
+ session.getFilterChain().clear();
+ }
}
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoSessionDataStructureFactory.java Wed Nov 7 03:01:58 2007
@@ -27,7 +27,6 @@
import java.util.Set;
import org.apache.mina.util.CircularQueue;
-import org.apache.mina.util.SynchronizedQueue;
/**
* The default {@link IoSessionDataStructureFactory} implementation
@@ -46,9 +45,9 @@
return new DefaultIoSessionAttributeMap();
}
- public Queue<WriteRequest> getWriteRequestQueue(IoSession session)
+ public WriteRequestQueue getWriteRequestQueue(IoSession session)
throws Exception {
- return new SynchronizedQueue<WriteRequest>(new CircularQueue<WriteRequest>(128));
+ return new DefaultWriteRequestQueue();
}
private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
@@ -169,6 +168,35 @@
}
public void dispose(IoSession session) throws Exception {
+ }
+ }
+
+ private static class DefaultWriteRequestQueue implements WriteRequestQueue {
+
+ private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(128);
+
+ public void dispose(IoSession session) {
+ }
+
+ public void clear(IoSession session) {
+ q.clear();
+ }
+
+ public synchronized boolean isEmpty(IoSession session) {
+ return q.isEmpty();
+ }
+
+ public synchronized void offer(IoSession session, WriteRequest writeRequest) {
+ q.offer(writeRequest);
+ }
+
+ public synchronized WriteRequest poll(IoSession session) {
+ return q.poll();
+ }
+
+ @Override
+ public String toString() {
+ return q.toString();
}
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Wed Nov 7 03:01:58 2007
@@ -105,7 +105,7 @@
public void flush(IoSession session) {
getFilterChain().fireMessageSent(
- ((DummySession) session).getWriteRequestQueue().poll());
+ ((DummySession) session).getWriteRequestQueue().poll(session));
}
public void remove(IoSession session) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IdleStatusChecker.java Wed Nov 7 03:01:58 2007
@@ -21,7 +21,6 @@
import java.util.HashSet;
import java.util.Iterator;
-import java.util.Queue;
import java.util.Set;
/**
@@ -137,12 +136,12 @@
AbstractIoSession s = (AbstractIoSession) session;
if (writeTimeout > 0 && currentTime - lastIoTime >= writeTimeout &&
- !s.getWriteRequestQueue().isEmpty()) {
- Queue<WriteRequest> queue = s.getWriteRequestQueue();
- WriteRequest request = queue.peek();
+ !s.getWriteRequestQueue().isEmpty(session)) {
+ WriteRequest request = s.getCurrentWriteRequest();
if (request != null) {
+ s.setCurrentWriteRequest(null);
WriteTimeoutException cause = new WriteTimeoutException(request);
- queue.poll().getFuture().setException(cause);
+ request.getFuture().setException(cause);
s.getFilterChain().fireExceptionCaught(cause);
// WriteException is an IOException, so we close the session.
s.close();
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionDataStructureFactory.java Wed Nov 7 03:01:58 2007
@@ -20,7 +20,6 @@
package org.apache.mina.common;
import java.util.Comparator;
-import java.util.Queue;
/**
* Provides data structures to a newly created session.
@@ -37,12 +36,12 @@
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
/**
- * Returns an {@link Queue} of {@link WriteRequest}s which is going to be
- * associated with the specified <tt>session</tt>. Please note that the
- * returned implementation must be thread-safe and robust enough to deal
+ * Returns an {@link WriteRequest} which is going to be associated with
+ * the specified <tt>session</tt>. Please note that the returned
+ * implementation must be thread-safe and robust enough to deal
* with various messages types (even what you didn't expect at all),
* especially when you are going to implement a priority queue which
* involves {@link Comparator}.
*/
- Queue<WriteRequest> getWriteRequestQueue(IoSession session) throws Exception;
+ WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}
Added: mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java?rev=592697&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java Wed Nov 7 03:01:58 2007
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+
+/**
+ * Stores {@link WriteRequest}s which are queued to an {@link IoSession}.
+ *
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface WriteRequestQueue {
+
+ WriteRequest poll(IoSession session);
+ void offer(IoSession session, WriteRequest writeRequest);
+ boolean isEmpty(IoSession session);
+ void clear(IoSession session);
+
+ /**
+ * Disposes any releases associated with the specified session.
+ * This method is invoked on disconnection.
+ */
+ void dispose(IoSession session);
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/WriteRequestQueue.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java Wed Nov 7 03:01:58 2007
@@ -44,6 +44,7 @@
import org.apache.mina.common.RuntimeIoException;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
import org.apache.mina.transport.socket.DatagramAcceptor;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
@@ -366,7 +367,8 @@
try {
boolean flushedAll = flush(session);
- if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
+ if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
+ !session.isScheduledForFlush()) {
scheduleFlush(session);
}
} catch (IOException e) {
@@ -388,20 +390,24 @@
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
DatagramChannel ch = session.getChannel();
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+ WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
int writtenBytes = 0;
int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
for (; ;) {
- WriteRequest req = writeRequestQueue.peek();
+ WriteRequest req = session.getCurrentWriteRequest();
if (req == null) {
- break;
+ req = writeRequestQueue.poll(session);
+ if (req == null) {
+ break;
+ }
+ session.setCurrentWriteRequest(req);
}
IoBuffer buf = (IoBuffer) req.getMessage();
if (buf.remaining() == 0) {
- // pop and fire event
- writeRequestQueue.poll();
+ // Clear and fire event
+ session.setCurrentWriteRequest(null);
buf.reset();
session.getFilterChain().fireMessageSent(req);
continue;
@@ -420,8 +426,8 @@
} else {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- // pop and fire event
- writeRequestQueue.poll();
+ // Clear and fire event
+ session.setCurrentWriteRequest(null);
writtenBytes += localWrittenBytes;
buf.reset();
session.getFilterChain().fireMessageSent(req);
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java Wed Nov 7 03:01:58 2007
@@ -24,7 +24,6 @@
import java.net.SocketException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
-import java.util.Queue;
import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.DefaultTransportMetadata;
@@ -37,6 +36,7 @@
import org.apache.mina.common.RuntimeIoException;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
import org.apache.mina.transport.socket.DatagramSession;
import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -153,8 +153,18 @@
}
@Override
- protected Queue<WriteRequest> getWriteRequestQueue() {
+ protected WriteRequestQueue getWriteRequestQueue() {
return super.getWriteRequestQueue();
+ }
+
+ @Override
+ protected WriteRequest getCurrentWriteRequest() {
+ return super.getCurrentWriteRequest();
+ }
+
+ @Override
+ protected void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
+ super.setCurrentWriteRequest(currentWriteRequest);
}
@Override
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java Wed Nov 7 03:01:58 2007
@@ -33,6 +33,7 @@
import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
import org.apache.mina.common.WriteToClosedSessionException;
/**
@@ -168,15 +169,15 @@
private class VmPipeIoProcessor implements IoProcessor {
public void flush(IoSession session) {
VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- Queue<WriteRequest> queue = s.getWriteRequestQueue();
- if (queue.isEmpty()) {
+ WriteRequestQueue queue = s.getWriteRequestQueue();
+ if (queue.isEmpty(s)) {
return;
}
if (s.isConnected()) {
if (s.getLock().tryLock()) {
try {
WriteRequest req;
- while ((req = queue.poll()) != null) {
+ while ((req = queue.poll(s)) != null) {
Object message = req.getMessage();
Object messageCopy = message;
if (message instanceof IoBuffer) {
@@ -200,7 +201,12 @@
flushPendingDataQueues(s);
}
} else {
- List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(queue);
+ List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
+ WriteRequest req;
+ while ((req = queue.poll(s)) != null) {
+ failedRequests.add(req);
+ }
+
if (!failedRequests.isEmpty()) {
WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
for (WriteRequest r: failedRequests) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java Wed Nov 7 03:01:58 2007
@@ -19,7 +19,6 @@
*/
package org.apache.mina.transport.vmpipe;
-import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
@@ -34,7 +33,7 @@
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestQueue;
/**
* A {@link IoSession} for in-VM transport (VM_PIPE).
@@ -153,7 +152,7 @@
}
@Override
- protected Queue<WriteRequest> getWriteRequestQueue() {
+ protected WriteRequestQueue getWriteRequestQueue() {
return super.getWriteRequestQueue();
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java Wed Nov 7 03:01:58 2007
@@ -73,10 +73,12 @@
@Override
public void clear() {
- Arrays.fill(items, null);
- first = 0;
- last = 0;
- full = false;
+ if (!isEmpty()) {
+ Arrays.fill(items, null);
+ first = 0;
+ last = 0;
+ full = false;
+ }
}
@SuppressWarnings("unchecked")
Modified: mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java?rev=592697&r1=592696&r2=592697&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/filter/codec/textline/TextLineDecoderTest.java Wed Nov 7 03:01:58 2007
@@ -282,8 +282,9 @@
Assert.assertEquals("B", session.getDecoderOutputQueue().poll());
// Make sure OOM is not thrown.
+ System.gc();
long oldFreeMemory = Runtime.getRuntime().freeMemory();
- in = IoBuffer.allocate(1048576 * 16).mark();
+ in = IoBuffer.allocate(1048576 * 16).sweep((byte) ' ').mark();
for (int i = 0; i < 10; i ++) {
decoder.decode(session, in.reset().mark(), out);
Assert.assertEquals(0, session.getDecoderOutputQueue().size());