You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by ma...@apache.org on 2014/11/10 17:46:03 UTC
svn commit: r1637924 - in /tomcat/trunk/java/org/apache:
coyote/http11/upgrade/Nio2ServletInputStream.java
tomcat/util/net/Nio2Endpoint.java
Author: markt
Date: Mon Nov 10 16:46:03 2014
New Revision: 1637924
URL: http://svn.apache.org/r1637924
Log:
Push read methods down into Nio2SocketWrapper
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java?rev=1637924&r1=1637923&r2=1637924&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/Nio2ServletInputStream.java Mon Nov 10 16:46:03 2014
@@ -16,208 +16,32 @@
*/
package org.apache.coyote.http11.upgrade;
-import java.io.EOFException;
import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
import org.apache.tomcat.util.net.SocketWrapperBase;
public class Nio2ServletInputStream extends AbstractServletInputStream {
private final SocketWrapperBase<Nio2Channel> wrapper;
- private final Nio2Channel channel;
- private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandler;
- private boolean flipped = false;
- private volatile boolean readPending = false;
- private volatile boolean interest = true;
public Nio2ServletInputStream(SocketWrapperBase<Nio2Channel> wrapper) {
this.wrapper = wrapper;
- this.channel = wrapper.getSocket();
- this.completionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
- @Override
- public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
- boolean notify = false;
- synchronized (completionHandler) {
- if (nBytes.intValue() < 0) {
- failed(new EOFException(), attachment);
- } else {
- readPending = false;
- if (interest && !Nio2Endpoint.isInline()) {
- interest = false;
- notify = true;
- }
- }
- }
- if (notify) {
- wrapper.getEndpoint().processSocket(attachment, SocketStatus.OPEN_READ, false);
- }
- }
- @Override
- public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
- attachment.setError(true);
- readPending = false;
- if (exc instanceof AsynchronousCloseException) {
- // If already closed, don't call onError and close again
- return;
- }
- onError(exc);
- wrapper.getEndpoint().processSocket(attachment, SocketStatus.ERROR, true);
- }
- };
}
@Override
protected boolean doIsReady() throws IOException {
- synchronized (completionHandler) {
- if (readPending) {
- interest = true;
- return false;
- }
- ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
- if (!flipped) {
- readBuffer.flip();
- flipped = true;
- }
- if (readBuffer.remaining() > 0) {
- return true;
- }
-
- readBuffer.clear();
- flipped = false;
- int nRead = fillReadBuffer(false);
-
- boolean isReady = nRead > 0;
- if (isReady) {
- if (!flipped) {
- readBuffer.flip();
- flipped = true;
- }
- } else {
- interest = true;
- }
- return isReady;
- }
+ return ((Nio2SocketWrapper) wrapper).doIsReady();
}
@Override
- protected int doRead(boolean block, byte[] b, int off, int len)
- throws IOException {
-
- synchronized (completionHandler) {
- if (readPending) {
- return 0;
- }
-
- ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
-
- if (!flipped) {
- readBuffer.flip();
- flipped = true;
- }
- int remaining = readBuffer.remaining();
- // Is there enough data in the read buffer to satisfy this request?
- if (remaining >= len) {
- readBuffer.get(b, off, len);
- return len;
- }
-
- // Copy what data there is in the read buffer to the byte array
- int leftToWrite = len;
- int newOffset = off;
- if (remaining > 0) {
- readBuffer.get(b, off, remaining);
- leftToWrite -= remaining;
- newOffset += remaining;
- }
-
- // Fill the read buffer as best we can
- readBuffer.clear();
- flipped = false;
- int nRead = fillReadBuffer(block);
-
- // Full as much of the remaining byte array as possible with the data
- // that was just read
- if (nRead > 0) {
- if (!flipped) {
- readBuffer.flip();
- flipped = true;
- }
- if (nRead > leftToWrite) {
- readBuffer.get(b, newOffset, leftToWrite);
- leftToWrite = 0;
- } else {
- readBuffer.get(b, newOffset, nRead);
- leftToWrite -= nRead;
- }
- } else if (nRead == 0) {
- if (block) {
- if (!flipped) {
- readBuffer.flip();
- flipped = true;
- }
- }
- } else if (nRead == -1) {
- throw new EOFException();
- }
-
- return len - leftToWrite;
- }
+ protected int doRead(boolean block, byte[] b, int off, int len) throws IOException {
+ return ((Nio2SocketWrapper) wrapper).doRead(block, b, off, len);
}
@Override
protected void doClose() throws IOException {
- channel.close();
- }
-
- private int fillReadBuffer(boolean block) throws IOException {
- ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
- int nRead = 0;
- if (block) {
- readPending = true;
- readBuffer.clear();
- flipped = false;
- try {
- nRead = channel.read(readBuffer)
- .get(wrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue();
- readPending = false;
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- onError(e.getCause());
- throw (IOException) e.getCause();
- } else {
- onError(e);
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- onError(e);
- throw new IOException(e);
- } catch (TimeoutException e) {
- SocketTimeoutException ex = new SocketTimeoutException();
- onError(ex);
- throw ex;
- }
- } else {
- readPending = true;
- readBuffer.clear();
- flipped = false;
- Nio2Endpoint.startInline();
- channel.read(readBuffer,
- wrapper.getTimeout(), TimeUnit.MILLISECONDS, wrapper, completionHandler);
- Nio2Endpoint.endInline();
- if (!readPending) {
- nRead = readBuffer.position();
- }
- }
- return nRead;
+ ((Nio2SocketWrapper) wrapper).doClose();
}
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1637924&r1=1637923&r2=1637924&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Mon Nov 10 16:46:03 2014
@@ -22,18 +22,22 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
@@ -732,8 +736,43 @@ public class Nio2Endpoint extends Abstra
private SendfileData sendfileData = null;
private boolean upgradeInit = false;
+ private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandler;
+ private boolean flipped = false;
+ private volatile boolean readPending = false;
+ private volatile boolean interest = true;
+
public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
super(channel, endpoint);
+ this.completionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
+ @Override
+ public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
+ boolean notify = false;
+ synchronized (completionHandler) {
+ if (nBytes.intValue() < 0) {
+ failed(new EOFException(), attachment);
+ } else {
+ readPending = false;
+ if (interest && !Nio2Endpoint.isInline()) {
+ interest = false;
+ notify = true;
+ }
+ }
+ }
+ if (notify) {
+ getEndpoint().processSocket(attachment, SocketStatus.OPEN_READ, false);
+ }
+ }
+ @Override
+ public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
+ attachment.setError(true);
+ readPending = false;
+ if (exc instanceof AsynchronousCloseException) {
+ // If already closed, don't call onError and close again
+ return;
+ }
+ getEndpoint().processSocket(attachment, SocketStatus.ERROR, true);
+ }
+ };
}
@Override
@@ -766,6 +805,144 @@ public class Nio2Endpoint extends Abstra
public void setSendfileData(SendfileData sf) { this.sendfileData = sf; }
public SendfileData getSendfileData() { return this.sendfileData; }
+ public boolean doIsReady() throws IOException {
+ synchronized (completionHandler) {
+ if (readPending) {
+ interest = true;
+ return false;
+ }
+ ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer();
+ if (!flipped) {
+ readBuffer.flip();
+ flipped = true;
+ }
+ if (readBuffer.remaining() > 0) {
+ return true;
+ }
+
+ readBuffer.clear();
+ flipped = false;
+ int nRead = fillReadBuffer(false);
+
+ boolean isReady = nRead > 0;
+ if (isReady) {
+ if (!flipped) {
+ readBuffer.flip();
+ flipped = true;
+ }
+ } else {
+ interest = true;
+ }
+ return isReady;
+ }
+ }
+
+ public int doRead(boolean block, byte[] b, int off, int len)
+ throws IOException {
+
+ synchronized (completionHandler) {
+ if (readPending) {
+ return 0;
+ }
+
+ ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer();
+
+ if (!flipped) {
+ readBuffer.flip();
+ flipped = true;
+ }
+ int remaining = readBuffer.remaining();
+ // Is there enough data in the read buffer to satisfy this request?
+ if (remaining >= len) {
+ readBuffer.get(b, off, len);
+ return len;
+ }
+
+ // Copy what data there is in the read buffer to the byte array
+ int leftToWrite = len;
+ int newOffset = off;
+ if (remaining > 0) {
+ readBuffer.get(b, off, remaining);
+ leftToWrite -= remaining;
+ newOffset += remaining;
+ }
+
+ // Fill the read buffer as best we can
+ readBuffer.clear();
+ flipped = false;
+ int nRead = fillReadBuffer(block);
+
+ // Full as much of the remaining byte array as possible with the data
+ // that was just read
+ if (nRead > 0) {
+ if (!flipped) {
+ readBuffer.flip();
+ flipped = true;
+ }
+ if (nRead > leftToWrite) {
+ readBuffer.get(b, newOffset, leftToWrite);
+ leftToWrite = 0;
+ } else {
+ readBuffer.get(b, newOffset, nRead);
+ leftToWrite -= nRead;
+ }
+ } else if (nRead == 0) {
+ if (block) {
+ if (!flipped) {
+ readBuffer.flip();
+ flipped = true;
+ }
+ }
+ } else if (nRead == -1) {
+ throw new EOFException();
+ }
+
+ return len - leftToWrite;
+ }
+ }
+
+ public void doClose() throws IOException {
+ getSocket().close();
+ }
+
+ private int fillReadBuffer(boolean block) throws IOException {
+ ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer();
+ int nRead = 0;
+ if (block) {
+ readPending = true;
+ readBuffer.clear();
+ flipped = false;
+ try {
+ nRead = getSocket().read(readBuffer)
+ .get(getTimeout(), TimeUnit.MILLISECONDS).intValue();
+ readPending = false;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException(e);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (TimeoutException e) {
+ SocketTimeoutException ex = new SocketTimeoutException();
+ throw ex;
+ }
+ } else {
+ readPending = true;
+ readBuffer.clear();
+ flipped = false;
+ Nio2Endpoint.startInline();
+ getSocket().read(readBuffer, getTimeout(), TimeUnit.MILLISECONDS,
+ this, completionHandler);
+ Nio2Endpoint.endInline();
+ if (!readPending) {
+ nRead = readBuffer.position();
+ }
+ }
+ return nRead;
+ }
+
}
// ------------------------------------------------ Application Buffer Handler
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org